diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 2d7f7548..00000000 --- a/.travis.yml +++ /dev/null @@ -1,45 +0,0 @@ -language: python - -matrix: - include: - - python: 3.7 - dist: xenial - sudo: true - env: TOXENV=dist - - python: 2.7 - - python: pypy - - python: 3.5 - - python: pypy3.5 - - python: 3.6 - - python: 3.7 - dist: xenial - sudo: true - - python: 3.8 - - python: 3.7 - dist: xenial - sudo: true - env: TOXENV=old_tests - - python: 3.6 - env: TOXENV=mypy - - python: 3.9 - - python: 3.10 - - allow_failures: - - python: 3.6 # until we solve all typing issues - env: TOXENV=mypy - -install: - - pip install tox tox-travis - -script: - - tox - - tox -e codecov - -deploy: - provider: pypi - username: __token__ - skip_existing: true - skip_cleanup: true - on: - tags: true - branch: master diff --git a/README.md b/README.md index 28213d4c..f0f96321 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ ## **Canmatrix** is a python package to read and write several CAN (Controller Area Network) database formats. ## [![PyPI](https://img.shields.io/pypi/v/canmatrix.svg)](https://pypi.org/project/canmatrix/) [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/canmatrix.svg)](https://pypi.org/project/canmatrix/) -[![Build Status](https://travis-ci.org/ebroecker/canmatrix.svg?branch=development)](https://travis-ci.org/ebroecker/canmatrix) [![Codecov branch](https://img.shields.io/codecov/c/github/ebroecker/canmatrix/development.svg)](https://codecov.io/gh/ebroecker/canmatrix/) [![GitHub issues](https://img.shields.io/github/issues-raw/ebroecker/canmatrix.svg)](https://github.com/ebroecker/canmatrix/issues) diff --git a/appveyor.yml b/appveyor.yml index 1cdeb6cf..f993c54e 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -7,15 +7,11 @@ skip_branch_with_pr: true environment: matrix: - - TOXENV: py27 - - TOXENV: py35 - - TOXENV: py36 - TOXENV: py37 - TOXENV: py38 - TOXENV: py39 - TOXENV: py310 - - TOXENV: pypy - - TOXENV: pypy3 + - TOXENV: py311 - TOXENV: dist matrix: @@ -28,7 +24,6 @@ init: - ps: if (Get-ChildItem Env:ENABLE_RDP -ErrorAction SilentlyContinue) {iex ((new-object net.webclient).DownloadString('https://raw.githubusercontent.com/appveyor/ci/master/scripts/enable-rdp.ps1'))} else {echo RDP not enabled} install: - - if "%TOXENV%"=="pypy" choco install python.pypy - if "%TOXENV%"=="pypy3" choco install python.pypy3 - py -m pip install tox - ps: Update-AppveyorBuild -Version "v$(py get_version.py) b$Env:APPVEYOR_BUILD_NUMBER" diff --git a/docs/cli.rst b/docs/cli.rst index aa67c528..9b547e63 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -193,6 +193,18 @@ this will remove frames ``myFrame`` and ``myFrame2`` in ``source.dbc`` and store this will load ``source.dbc`` and rename frames ``myFrame`` in ``myNewFrame`` and ``myFrame2`` in ``myNewFrame2``. The result is stored in ``target.dlc``. +**compress Frame:** + +:: + + $ canconvert --compressFrame=myFrame,myFrame2,someFrames* source.dbc target.dbc + +this will load ``source.dbc`` and compress frames ``myFrame`` in ``myFrame2`` and all frames starting with ``someFrames``. +compress means, it tries to fill gaps between signals. +Works only for frames which have only big_endian signals or frames which have only little_endian singals +Frame name could be * which will compress all frames +The result is stored in ``target.dlc``. + **delete Signal:** @@ -365,6 +377,10 @@ ____________________ rename Frame form databases. (comma separated list) Syntax: --renameFrame=myOldFrame:myNewFrame,mySecondFrame:mySecondNewFrame + --compressFrame=FRAME + + compress Frame form databases. Syntax: --compressFrame=frame1,frame2,* + --deleteSignal=DELETESIGNAL delete Signal form databases. (comma separated list) Syntax: --deleteSignal=mySignal1,mySecondSignal diff --git a/docs/contents.rst b/docs/contents.rst index 8b137891..21d7cc02 100644 --- a/docs/contents.rst +++ b/docs/contents.rst @@ -1 +1,4 @@ +Canmatrix Documentation +======================= + diff --git a/setup.py b/setup.py index 29e6edb5..16368ce9 100644 --- a/setup.py +++ b/setup.py @@ -84,6 +84,7 @@ "click", "enum34; python_version < '3.4'", "future", + "importlib-metadata; python_version < '3.8'", "six", "typing; python_version < '3.5'", ], diff --git a/src/canmatrix/cancluster.py b/src/canmatrix/cancluster.py index df5e517b..54155bff 100644 --- a/src/canmatrix/cancluster.py +++ b/src/canmatrix/cancluster.py @@ -14,6 +14,8 @@ def __init__(self, *arg, **kw): self._frames = [] # type: typing.List[canmatrix.Frame] self._signals = [] # type: typing.List[canmatrix.Signal] self._ecus = [] # type: typing.List[canmatrix.Ecu] + self._pdu_gateway_list = [] # type: typing.List[dict[str, str]] + self._signal_gateway_list = [] # type: typing.List[dict[str, str]] self.update() def update_frames(self): # type: () -> typing.MutableSequence[canmatrix.Frame] @@ -82,3 +84,45 @@ def signals(self): # type: () -> typing.MutableSequence[canmatrix.Signal] if not self._signals: self.update_signals() return self._signals + + def pdu_gateway(self, pdu_gateway_list=[]): + self._pdu_gateway_list += pdu_gateway_list + return self._pdu_gateway_list + + def signal_gateway(self, signal_gateway_list=[]): + self._signal_gateway_list += signal_gateway_list + return self._signal_gateway_list + + def get_pdu_routing_info(self, pdu_name, strict_search=False): + routing_source = [] + routing_target = [] + if strict_search: + for pdu in self._pdu_gateway_list: + if pdu_name == pdu["source"]: + routing_source.append({"pdu": pdu["target"], "cluster": pdu["target_cluster"], "ecu": pdu["ecu"], "type": pdu["target_type"]}) + if pdu_name == pdu["target"]: + routing_target.append({"pdu": pdu["source"], "cluster": pdu["source_cluster"], "ecu": pdu["ecu"], "type": pdu["source_type"]}) + else: + for pdu in self._pdu_gateway_list: + if pdu_name in pdu["source"]: + routing_source.append({"pdu": pdu["target"], "cluster": pdu["target_cluster"], "ecu": pdu["ecu"], "type": pdu["target_type"]}) + if pdu_name in pdu["target"]: + routing_target.append({"pdu": pdu["source"], "cluster": pdu["source_cluster"], "ecu": pdu["ecu"], "type": pdu["source_type"]}) + return {"source": routing_source, "target": routing_target} + + def get_signal_routing_info(self, signal_name, strict_search=False): + routing_source = [] + routing_target = [] + if strict_search: + for signal_gw in self._signal_gateway_list: + if signal_name == signal_gw["source"]: + routing_source.append({"signal": signal_gw["target"], "cluster": signal_gw["target_cluster"], "ecu": signal_gw["ecu"], "type": signal_gw["target_type"]}) + if signal_name == signal_gw["target"]: + routing_target.append({"signal": signal_gw["source"], "cluster": signal_gw["source_cluster"], "ecu": signal_gw["ecu"], "type": signal_gw["source_type"]}) + else: + for signal_gw in self._signal_gateway_list: + if signal_name in signal_gw["source"]: + routing_source.append({"signal": signal_gw["target"], "cluster": signal_gw["target_cluster"], "ecu": signal_gw["ecu"], "type": signal_gw["target_type"]}) + if signal_name in signal_gw["target"]: + routing_target.append({"signal": signal_gw["source"], "cluster": signal_gw["source_cluster"], "ecu": signal_gw["ecu"], "type": signal_gw["source_type"]}) + return {"source": routing_source, "target": routing_target} diff --git a/src/canmatrix/canmatrix.py b/src/canmatrix/canmatrix.py index 138dab07..ac80a9f4 100644 --- a/src/canmatrix/canmatrix.py +++ b/src/canmatrix/canmatrix.py @@ -34,6 +34,7 @@ import logging import math import struct +import sys import typing import warnings from builtins import * @@ -46,8 +47,14 @@ import canmatrix.types import canmatrix.utils -if attr.__version__ < '17.4.0': # type: ignore +if sys.version_info < (3, 8): + from importlib_metadata import version +else: + from importlib.metadata import version + +if version("attrs") < '17.4.0': raise RuntimeError("need attrs >= 17.4.0") + logger = logging.getLogger(__name__) defaultFloatFactory = decimal.Decimal # type: typing.Callable[[typing.Any], canmatrix.types.PhysicalValue] @@ -63,7 +70,7 @@ class MissingMuxSignal(ExceptionTemplate): pass class DecodingComplexMultiplexed(ExceptionTemplate): pass class DecodingFrameLength(ExceptionTemplate): pass class ArbitrationIdOutOfRange(ExceptionTemplate): pass -class J1939needsExtendedIdetifier(ExceptionTemplate): pass +class J1939NeedsExtendedIdentifier(ExceptionTemplate): pass class DecodingConatainerPdu(ExceptionTemplate): pass class EncodingConatainerPdu(ExceptionTemplate): pass @@ -106,7 +113,12 @@ def add_attribute(self, attribute, value): # type (attribute: str, value: typin :param str attribute: Attribute name :param any value: Attribute value """ - self.attributes[attribute] = value + try: + self.attributes[attribute] = str(value) + except UnicodeDecodeError: + self.attributes[attribute] = value + if type(self.attributes[attribute]) == str: + self.attributes[attribute] = self.attributes[attribute].strip() def del_attribute(self, attribute): if attribute in self.attributes: @@ -140,7 +152,7 @@ class Signal(object): * factor, offset, min, max * receivers (ECU Name) * attributes, _values, unit, comment - * _multiplex ('Multiplexor' or Number of Multiplex) + * multiplex ('Multiplexor' or Number of Multiplex) """ name = attr.ib(default="") # type: str @@ -218,6 +230,7 @@ def multiplex_setter(self, value): self.mux_val = int(value) elif value == 'Multiplexor': self.is_multiplexer = True + self.multiplex = 'Multiplexor' ret_multiplex = value return ret_multiplex @@ -282,7 +295,12 @@ def add_attribute(self, attribute, value): :param str attribute: attribute name :param value: attribute value """ - self.attributes[attribute] = value + try: + self.attributes[attribute] = str(value) + except UnicodeDecodeError: + self.attributes[attribute] = value + if type(self.attributes[attribute]) == str: + self.attributes[attribute] = self.attributes[attribute].strip() def del_attribute(self, attribute): """ @@ -418,7 +436,7 @@ def phys2raw(self, value=None): for value_key, value_string in self.values.items(): if value_string == value: value = value_key - break + return value else: raise ValueError( "{} is invalid value choice for {}".format(value, self) @@ -429,10 +447,10 @@ def phys2raw(self, value=None): "Value {} is not valid for {}. Min={} and Max={}".format( value, self, self.min, self.max) ) - raw_value = (value - self.offset) / self.factor + raw_value = (self.float_factory(value) - self.float_factory(self.offset)) / self.float_factory(self.factor) if not self.is_float: - raw_value = int(raw_value) + raw_value = int(round(raw_value)) return raw_value def raw2phys(self, value, decode_to_str=False): @@ -445,12 +463,14 @@ def raw2phys(self, value, decode_to_str=False): """ if self.is_float: value = self.float_factory(value) - result = value * self.factor + self.offset # type: typing.Union[canmatrix.types.PhysicalValue, str] if decode_to_str: for value_key, value_string in self.values.items(): - if value_key == result: - result = value_string + if value_key == value: + return value_string break + + result = value * self.factor + self.offset # type: typing.Union[canmatrix.types.PhysicalValue, str] + return result def __str__(self): # type: () -> str @@ -631,7 +651,7 @@ def j1939_pgn(self): @property def pgn(self): if not self.extended: - raise J1939needsExtendedIdetifier + raise J1939NeedsExtendedIdentifier # PGN is bits 8-25 of the 29-Bit Extended CAN-ID # Made up of PDU-S (8-15), PDU-F (16-23), Data Page (24) & Extended Data Page (25) # If PDU-F >= 240 the PDU-S is interpreted as Group Extension @@ -665,7 +685,7 @@ def j1939_tuple(self): # type: () -> typing.Tuple[int, int, int] @property def j1939_destination(self): if not self.extended: - raise J1939needsExtendedIdetifier + raise J1939NeedsExtendedIdentifier if self.j1939_pdu_format == 1: destination = self.j1939_ps else: @@ -675,7 +695,7 @@ def j1939_destination(self): @property def j1939_source(self): if not self.extended: - raise J1939needsExtendedIdetifier + raise J1939NeedsExtendedIdentifier return self.id & 0xFF @j1939_source.setter @@ -686,13 +706,13 @@ def j1939_source(self, value): # type: (int) -> None @property def j1939_ps(self): if not self.extended: - raise J1939needsExtendedIdetifier + raise J1939NeedsExtendedIdentifier return (self.id >> 8) & 0xFF @property def j1939_pf(self): if not self.extended: - raise J1939needsExtendedIdetifier + raise J1939NeedsExtendedIdentifier return (self.id >> 16) & 0xFF @property @@ -702,19 +722,19 @@ def j1939_pdu_format(self): @property def j1939_dp(self): if not self.extended: - raise J1939needsExtendedIdetifier + raise J1939NeedsExtendedIdentifier return (self.id >> 24) & 0x1 @property def j1939_edp(self): if not self.extended: - raise J1939needsExtendedIdetifier + raise J1939NeedsExtendedIdentifier return (self.id >> 25) & 0x1 @property def j1939_priority(self): if not self.extended: - raise J1939needsExtendedIdetifier + raise J1939NeedsExtendedIdentifier return (self.id >> 26) & 0x7 @j1939_priority.setter @@ -789,6 +809,7 @@ def add_signal(self, signal): """ self.signals.append(signal) return self.signals[len(self.signals) - 1] + def add_signal_group(self, Name, Id, signalNames, e2e_trans=None): # type: (str, int, typing.Sequence[str]) -> None """Add new SignalGroup to the Frame. Add given signals to the group. @@ -873,6 +894,7 @@ class Frame(object): pdus = attr.ib(factory=list) # type: typing.MutableSequence[Pdu] header_id = attr.ib(default=None) #type: int # header_id + pdu_name = attr.ib(default="") # type: str @property def is_multiplexed(self): # type: () -> bool @@ -1533,6 +1555,54 @@ def decode(self, data): else: return decoded + + def _compress_little(self): + for signal in self.signals: + if not signal.is_little_endian: + return + gap_found = True + while gap_found: + gap_found = False + layout = self.get_frame_layout() + gap_len = None + for byte in range(len(layout)//8): + for bit in range(7,-1,-1): + bit_nr = byte*8+bit + signal_list = layout[bit_nr] + if signal_list == []: + if gap_len is None: + gap_len = 1 + else: + gap_len += 1 + else: + if gap_len is not None: + signal = layout[bit_nr][0] + signal.start_bit -= gap_len + gap_found = True + break + if gap_found: + break + + def compress(self): + for signal in self.signals: + if signal.is_little_endian: + return self._compress_little() + gap_found = True + while gap_found: + gap_found = False + layout = self.get_frame_layout() + free_start = None + for bit_nr, signal_list in enumerate(layout): + if signal_list == []: + if free_start is None: + free_start = bit_nr + else: + if free_start is not None: + signal = layout[bit_nr][0] + signal.start_bit = free_start + gap_found = True + break + def __str__(self): # type: () -> str """Represent the frame by its name only.""" @@ -1715,7 +1785,12 @@ def add_attribute(self, attribute, value): # type: (str, typing.Any) -> None :param str attribute: attribute name :param value: attribute value """ - self.attributes[attribute] = value + try: + self.attributes[attribute] = str(value) + except UnicodeDecodeError: + self.attributes[attribute] = value + if type(self.attributes[attribute]) == str: + self.attributes[attribute] = self.attributes[attribute].strip() def add_signal_defines(self, type, definition): """ diff --git a/src/canmatrix/cli/convert.py b/src/canmatrix/cli/convert.py index ac226cfc..e8caa851 100644 --- a/src/canmatrix/cli/convert.py +++ b/src/canmatrix/cli/convert.py @@ -65,6 +65,7 @@ def get_formats(): @click.option('--deleteSignalAttributes', 'deleteSignalAttributes', help="delete attributes from all signals\nExample --deleteSignalAttributes GenMsgSomeVar,CycleTime") @click.option('--deleteFrame', 'deleteFrame', help="delete Frame form databases. (comma separated list)\nSyntax: --deleteFrame=myFrame1,mySecondFrame") @click.option('--renameFrame', 'renameFrame', help="increment each frame.id in database by increment\nSyntax: --frameIdIncrement=increment") +@click.option('--compressFrame', 'compressFrame', help="remove gaps between signals in Frame. (comma separated list of Frames)\nSyntax: --compressFrame=myFrame1,mySecondFrame") @click.option('--addFrameReceiver', 'addFrameReceiver', help="add receiver Ecu to frame(s) (comma separated list)\nSyntax: --addFrameReceiver=framename:myNewEcu,mySecondEcu:myNEWEcu") @click.option('--changeFrameId', 'changeFrameId', help="change frame.id in database\nSyntax: --changeFrameId=oldId:newId") @click.option('--setFrameFd', 'setFrameFd', help="set Frame from database to canfd. (comma separated list)\nSyntax: --setFrameFd=myFrame1,mySecondFrame") @@ -78,13 +79,14 @@ def get_formats(): @click.option('--signals', help="Copy only given Signals (comma separated list) to target matrix just as 'free' signals without containing frame") @click.option('--merge', help="merge additional can databases.\nSyntax: --merge filename[:ecu=SOMEECU][:frame=FRAME1][:frame=FRAME2],filename2") @click.option('--ignorePduContainer/--no-ignorePduContainer', 'ignorePduContainer', default = False, help="Ignore any Frame with PDU container; if no export as multiplexed Frames\ndefault False") -@click.option('--calcSignalMax/--no-calcSignalMax', 'calcSignalMax', default = False, help="Calculate Signals Maximum Physical Value; If maximum value is set to 0\ndefault False") -@click.option('--recalcSignalMax/--no-recalcSignalMax', 'recalcSignalMax', default = False, help="Recalculate Signals Maximum Physical Value for the entire database\ndefault False") +@click.option('--calcSignalMaximumsWhereZero/--no-calcSignalMaximumsWhereZero', 'calcSignalMaximumsWhereZero', default = False, help="Calculate Signals Maximum Physical Value; If maximum value is set to 0\ndefault False") +@click.option('--recalcSignalMaximums/--no-recalcSignalMaximums', 'recalcSignalMaximums', default = False, help="Recalculate Signals Maximum Physical Value for the entire database\ndefault False") @click.option('--deleteFloatingSignals/--no-deleteFloatingSignals', 'deleteFloatingSignals', default = False, help="if deleteFloatingSignals is set , then unassigned signals to a frame/message will be deleted \tdefault: False") +@click.option('--recalcSignalMinimums/--no-recalcSignalMinimums', 'recalcSignalMinimums', default = False, help="Recalculate Signals Minimum Physical Value for the entire database\ndefault False") #Frame/Signal Check switches @click.option('--checkFloatingFrames/--no-checkFloatingFrames', 'checkFloatingFrames', default = False, help="if checkFloatingFrames is set, CAN message/frame without sender node will be warned .\tdefault: False") -@click.option('--checkSignalRange/--no-checkSignalRange', 'checkSignalRange', default = False, help="if checkSignalRange is set, then signals consisting raw min/max value set to 0 will be warned. \tdefault: False ") +@click.option('--warnSignalMinMaxSame/--no-warnSignalMinMaxSame', 'warnSignalMinMaxSame', default = False, help="if warnSignalMinMaxSame is set, then signals consisting same raw min & max value will be warned. \tdefault: False ") @click.option('--checkSignalUnit/--no-checkSignalUnit', 'checkSignalUnit', default = False, help="if checkSignalUnit is set , then signals without units and value table will be warned. \tdefault: False") @click.option('--checkSignalReceiver/--no-checkSignalReceiver', 'checkSignalReceiver', default = False, help="if checkSignalReceiver is set, then signals without an assigned Receiver will be warned \tdefault: False") @click.option('--checkFloatingSignals/--no-checkFloatingSignals', 'checkFloatingSignals', default = False, help="if checkFloatingSignals is set, then unassigned signals to a frame/message will be warned \tdefault: False") diff --git a/src/canmatrix/convert.py b/src/canmatrix/convert.py index b7ae9015..7e329b87 100644 --- a/src/canmatrix/convert.py +++ b/src/canmatrix/convert.py @@ -132,6 +132,7 @@ def convert(infile, out_file_name, **options): # type: (str, str, **str) -> Non for renameTuple in rename_tuples: old, new = renameTuple.split(':') db.rename_frame(old, new) + if 'deleteFrame' in options and options['deleteFrame'] is not None: delete_frame_names = options['deleteFrame'].split(',') for frame_name in delete_frame_names: @@ -227,6 +228,13 @@ def convert(infile, out_file_name, **options): # type: (str, str, **str) -> Non 'deleteObsoleteEcus']: db.delete_obsolete_ecus() + if 'compressFrame' in options and options['compressFrame'] is not None: + frames_cmdline = options['compressFrame'].split(',') + for frame_name in frames_cmdline: + frames = db.glob_frames(frame_name) + for frame in frames: + frame.compress() + if 'recalcDLC' in options and options['recalcDLC']: db.recalc_dlc(options['recalcDLC']) @@ -251,19 +259,29 @@ def convert(infile, out_file_name, **options): # type: (str, str, **str) -> Non for signal in [b for a in db for b in a.signals]: signal.name = signal.attributes.get(options.get('signalNameFromAttrib'), signal.name) + if options.get('frameNameFromAttrib') is not None: + for frame in db: + frame.name = frame.attributes.get(options.get('frameNameFromAttrib'), frame.name) + # Max Signal Value Calculation , if max value is 0 - if options.get('calcSignalMax') is not None and options['calcSignalMax']: + if options.get('calcSignalMaximumsWhereZero') is not None and options['calcSignalMaximumsWhereZero']: for signal in [b for a in db for b in a.signals]: if signal.max == 0 or signal.max is None: signal.calc_max_for_none = True signal.set_max(None) # Max Signal Value Calculation - if options.get('recalcSignalMax') is not None and options['recalcSignalMax']: + if options.get('recalcSignalMaximums') is not None and options['recalcSignalMaximums']: for signal in [b for a in db for b in a.signals]: signal.calc_max_for_none = True signal.set_max(None) + # Min Signal Value Calculation + if options.get('recalcSignalMinimums') is not None and options['recalcSignalMinimums']: + for signal in [b for a in db for b in a.signals]: + signal.calc_min_for_none = True + signal.set_min(None) + # Delete Unassigned Signals to a Valid Frame/Message if options.get('deleteFloatingSignals') is not None and options['deleteFloatingSignals']: for frame in db.frames: @@ -290,21 +308,21 @@ def convert(infile, out_file_name, **options): # type: (str, str, **str) -> Non # Check & Warn for Frame/Messages without Transmitter Node if options.get('checkFloatingFrames') is not None and options['checkFloatingFrames']: for frame in db.frames: - if len(frame.transmitters) is 0: + if len(frame.transmitters) == 0: logger.warning("No Transmitter Node Found for Frame %s", frame.name) # Check & Warn for Signals with Min/Max set to 0 - if options.get('checkSignalRange') is not None and options['checkSignalRange']: + if options.get('warnSignalMinMaxSame') is not None and options['warnSignalMinMaxSame']: for frame in db.frames: for signal in frame.signals: - if (signal.phys2raw(signal.max) - signal.phys2raw(signal.min)) is 0: + if (signal.phys2raw(signal.max) - signal.phys2raw(signal.min)) == 0: logger.warning("Invalid Min , Max value of %s", (frame.name+"::"+signal.name)) # Check for Signals without unit and Value table , the idea is to improve signal readability if options.get('checkSignalUnit') is not None and options['checkSignalUnit']: for frame in db.frames: for signal in frame: - if signal.unit is "" and len(signal.values) == 0: + if signal.unit == "" and len(signal.values) == 0: logger.warning("Please add value table for the signal %s or add appropriate Unit", (frame.name+"::"+signal.name)) # Convert dbc from J1939 to Extended format @@ -317,7 +335,9 @@ def convert(infile, out_file_name, **options): # type: (str, str, **str) -> Non if options.get('convertToJ1939') is not None and options['convertToJ1939']: for frame in db.frames: frame.is_j1939=True - db.add_attribute("ProtocolType","J1939") + db.add_attribute("ProtocolType", "J1939") + + logger.info(name) logger.info("%d Frames found" % (db.frames.__len__())) diff --git a/src/canmatrix/formats/arxml.py b/src/canmatrix/formats/arxml.py index 3ceafc93..2501d986 100644 --- a/src/canmatrix/formats/arxml.py +++ b/src/canmatrix/formats/arxml.py @@ -35,6 +35,7 @@ import lxml.etree import canmatrix +import canmatrix.cancluster import canmatrix.types import canmatrix.utils import re @@ -51,6 +52,7 @@ _MultiplexId = typing.Union[str, int, None] _FloatFactory = typing.Callable[[typing.Any], typing.Any] + class Earxml: def __init__(self): self.xml_element_cache = dict() # type: typing.Dict[str, _Element] @@ -76,7 +78,6 @@ def fill_caches(self, start_element=None, ar_path=""): def open(self, filename): self.tree = lxml.etree.parse(filename) - self.root = self.tree.getroot() # type: _Element self.ns = "{" + self.tree.xpath('namespace-uri(.)') + "}" # type: str @@ -127,6 +128,13 @@ def get_referencable_parent(self, xml_element): xml_element = xml_element.getparent() return xml_element + def get_parent_with_name(self, xml_element, tag_name): + while xml_element != self.root: + if xml_element.tag == self.ns + tag_name: + return xml_element + xml_element = xml_element.getparent() + return None + def find_references_of_type(self, element, xml_tag, referencable_parent=False): referencing_elements = [] current_ar_path = self.get_short_name_path_of_element(element) @@ -248,7 +256,7 @@ def selector(self, start_element, selector): result_list = [start_element] last_found_token = 0 while start_pos < len(selector): - token_match = re.search(r'//|/|>>|>|<<|<|#|$', selector[start_pos:]) + token_match = re.search(r'//|/|>>|>|<<|<|#|:|$', selector[start_pos:]) found_token = token_match.span() if start_pos > 0: # at least one Token found... @@ -273,6 +281,13 @@ def selector(self, start_element, selector): result_list = [self.find_references_of_type(a, value, referencable_parent=True)[0] for a in result_list if len(self.find_references_of_type(a, value, referencable_parent=True)) > 0] + elif token == ":": + filtered_results = [] + for item in result_list: + if value == item.text: + filtered_results.append(item) + result_list = filtered_results + elif token == "#": sn_snippets = value.split("|") filtered_results = [] @@ -782,7 +797,7 @@ def dump(dbs, f, **options): compu_int_to_phys = create_sub_element( compu_method, 'COMPU-INTERNAL-TO-PHYS') compu_scales = create_sub_element(compu_int_to_phys, 'COMPU-SCALES') - for value in sorted(signal.values, key=lambda x: int(x)): + for value in sorted(signal.values, key=lambda x: int(x, 0)): compu_scale = create_sub_element(compu_scales, 'COMPU-SCALE') desc = create_sub_element(compu_scale, 'DESC') l2 = create_sub_element(desc, 'L-2') @@ -986,7 +1001,7 @@ def get_signalgrp_and_signals(sys_signal, sys_signal_array, frame, group_id, ea) } data_id_elems = ea.get_children(ea.get_child(sys_signal, "TRANSFORMATION-I-SIGNAL-PROPSS"), "DATA-ID") if data_id_elems is not None: - e2e_transform['data_ids'] = [int(x.text) for x in data_id_elems] + e2e_transform['data_ids'] = [int(x.text, 0) for x in data_id_elems] frame.add_signal_group(ea.get_element_name(sys_signal), group_id, members, e2e_transform) @@ -1016,7 +1031,14 @@ def decode_compu_method(compu_method, ea, float_factory): # keyword definition. 06Jun16 ##################################################################################################### - if ll is not None and desc is not None and canmatrix.utils.decode_number(ul.text, + if desc is None or len(desc) == 0: + vt = ea.get_sub_by_name(compu_scale, 'VT') + if vt is not None: + desc = vt.text + + rational = ea.get_child(compu_scale, "COMPU-RATIONAL-COEFFS") + + if rational is None and ll is not None and desc is not None and canmatrix.utils.decode_number(ul.text, float_factory) == canmatrix.utils.decode_number( ll.text, float_factory): ##################################################################################################### @@ -1024,7 +1046,6 @@ def decode_compu_method(compu_method, ea, float_factory): values[ll.text] = desc # scale_desc = ea.get_element_desc(compu_scale) - rational = ea.get_child(compu_scale, "COMPU-RATIONAL-COEFFS") if rational is not None: numerator_parent = ea.get_child(rational, "COMPU-NUMERATOR") numerator = ea.get_children(numerator_parent, "V") @@ -1079,7 +1100,7 @@ def ar_byteorder_is_little(in_string): return False -def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset=0): +def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset=0, signal_triggerings=[]): # type: (typing.Sequence[_Element], typing.Union[canmatrix.Frame, canmatrix.Pdu], Earxml, int, typing.Callable, int) -> None """Add signals from xml to the Frame.""" @@ -1109,6 +1130,16 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset 'Frame %s, no isignal for %s found', frame.name, ea.get_child(signal, "SHORT-NAME").text) + receiver = [] # type: typing.List[str] + + for triggering in signal_triggerings: + try: + if ea.selector(triggering, ">I-SIGNAL-REF")[0] == isignal: + reciving_ecu_instances = ea.selector(triggering, ">>I-SIGNAL-PORT-REF//COMMUNICATION-DIRECTION:IN/../../..") + receiver = [ea.get_short_name(a) for a in reciving_ecu_instances] + except IndexError: + pass + base_type = ea.follow_ref(isignal, "BASE-TYPE-REF") # AR4 if base_type is None: a = ea.selector(isignal, ">SYSTEM-SIGNAL-REF>DATA-TYPE-REF>BASE-TYPE-REF") @@ -1132,7 +1163,7 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset if system_signal is not None and "SYSTEM-SIGNAL-GROUP" in system_signal.tag: system_signals = ea.selector(system_signal, "SYSTEM-SIGNAL-REFS>>SYSTEM-SIGNAL-REF") - get_sys_signals(system_signal, system_signals, frame, group_id, ea) + get_signalgrp_and_signals(system_signal, system_signals, frame, group_id, ea) group_id = group_id + 1 continue @@ -1143,7 +1174,6 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset signal_min = None # type: canmatrix.types.OptionalPhysicalValue signal_max = None # type: canmatrix.types.OptionalPhysicalValue - receiver = [] # type: typing.List[str] signal_description = ea.get_element_desc(system_signal) @@ -1218,7 +1248,7 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset (is_signed, is_float) = eval_type_of_signal(type_encoding, base_type, ea) - unit_element = ea.get_child(isignal, "UNIT") + unit_element = ea.follow_ref(isignal, "UNIT-REF") display_name = ea.get_child(unit_element, "DISPLAY-NAME") if display_name is not None: signal_unit = display_name.text @@ -1269,8 +1299,8 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset if start_bit is not None: new_signal = canmatrix.Signal( name, - start_bit=int(start_bit.text) + bit_offset, - size=int(length.text) if length is not None else 0, + start_bit=int(start_bit.text, 0) + bit_offset, + size=int(length.text, 0) if length is not None else 0, is_little_endian=is_little_endian, is_signed=is_signed, factor=factor, @@ -1288,7 +1318,7 @@ def get_signals(signal_array, frame, ea, multiplex_id, float_factory, bit_offset if not new_signal.is_little_endian: # startbit of motorola coded signals are MSB in arxml - new_signal.set_startbit(int(start_bit.text) + bit_offset, bitNumbering=1) + new_signal.set_startbit(int(start_bit.text, 0) + bit_offset, bitNumbering=1) communication_direction = ea.selector(isignal, "I-SIGNAL-PORT-REF/COMMUNICATION-DIRECTION") if len(communication_direction) > 0: @@ -1336,8 +1366,8 @@ def get_frame_from_multiplexed_ipdu(pdu, target_frame, multiplex_translation, ea is_signed = False # unsigned multiplexor = canmatrix.Signal( "Multiplexor", - start_bit=int(selector_start.text), - size=int(selector_len.text), + start_bit=int(selector_start.text, 0), + size=int(selector_len.text, 0), is_little_endian=is_little_endian, multiplex="Multiplexor") @@ -1439,7 +1469,7 @@ def get_frame_from_container_ipdu(pdu, target_frame, ea, float_factory, headers_ ipdu = ea.follow_ref(payload, "I-PDU-REF") logger.info("found secured pdu '%s', dissolved to '%s'", secured_i_pdu_name, ea.get_element_name(ipdu)) try: - offset = int(ea.get_child(ipdu, "OFFSET").text) * 8 + offset = int(ea.get_child(ipdu, "OFFSET").text, 0) * 8 except: offset = 0 @@ -1451,7 +1481,7 @@ def get_frame_from_container_ipdu(pdu, target_frame, ea, float_factory, headers_ pdu_port_type = ea.get_child(cpdu, "I-PDU-PORT-REF").attrib["DEST"] except (AttributeError, KeyError): pdu_port_type = "" - ipdu_length = int(ea.get_child(ipdu, "LENGTH").text) + ipdu_length = int(ea.get_child(ipdu, "LENGTH").text, 0) ipdu_name = ea.get_element_name(ipdu) ipdu_triggering_name = ea.get_element_name(cpdu) target_pdu = canmatrix.Pdu(name=ipdu_name, size=ipdu_length, id=header_id, @@ -1506,14 +1536,16 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header arb_id = ea.get_child(frame_triggering, "IDENTIFIER") frame_elem = ea.follow_ref(frame_triggering, "FRAME-REF") - frame_name_elem = ea.get_child(frame_triggering, "SHORT-NAME") - logger.debug("processing Frame: %s", frame_name_elem.text) + frame_trig_name_elem = ea.get_child(frame_triggering, "SHORT-NAME") + logger.debug("processing Frame-Trigger: %s", frame_trig_name_elem.text) if arb_id is None: - logger.info("found Frame %s without arbitration id", frame_name_elem.text) + logger.info("found Frame-Trigger %s without arbitration id", frame_trig_name_elem.text) return None - arbitration_id = int(arb_id.text) + arbitration_id = int(arb_id.text, 0) + isignaltriggerings_of_current_cluster = ea.selector(frame_triggering, "/..//I-SIGNAL-TRIGGERING") if frame_elem is not None: + logger.debug("Frame: %s", ea.get_element_name(frame_elem)) if frame_elem in frames_cache: return copy.deepcopy(frames_cache[frame_elem]) dlc_elem = ea.get_child(frame_elem, "FRAME-LENGTH") @@ -1525,13 +1557,17 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header pdu = ea.selector(pdu, ">PAYLOAD-REF>I-PDU-REF")[0] # logger.info("found secured pdu - no signal extraction possible: %s", get_element_name(pdu, ns)) - new_frame = canmatrix.Frame(ea.get_element_name(frame_elem), size=int(dlc_elem.text)) + new_frame = canmatrix.Frame(ea.get_element_name(frame_elem), size=int(dlc_elem.text, 0)) comment = ea.get_element_desc(frame_elem) + if pdu is not None: + new_frame.add_attribute("PduName", ea.get_short_name(pdu)) + new_frame.add_attribute("FrameTriggeringName", ea.get_short_name(frame_triggering)) + if comment is not None: new_frame.add_comment(comment) else: # without frameinfo take short-name of frametriggering and dlc = 8 - logger.debug("Frame %s has no FRAME-REF", frame_name_elem.text) + logger.debug("Frame-Trigger %s has no FRAME-REF", frame_trig_name_elem.text) pdu = ea.selector(frame_triggering, ">I-PDU-TRIGGERING-REF>I-PDU-REF") # AR4.2 if len(pdu) == 0: pdu = ea.selector(frame_triggering, ">PDU-TRIGGERING-REF>I-PDU-REF") @@ -1540,13 +1576,17 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header else: pdu = None dlc_elem = ea.get_child(pdu, "LENGTH") - new_frame = canmatrix.Frame(frame_name_elem.text, arbitration_id=arbitration_id, - size=int(int(dlc_elem.text) / 8)) + new_frame = canmatrix.Frame(frame_trig_name_elem.text, arbitration_id=arbitration_id, + size=int(int(dlc_elem.text, 0) / 8)) + if pdu is not None: + new_frame.add_attribute("PduName", ea.get_short_name(pdu)) + new_frame.add_attribute("FrameTriggeringName", ea.get_short_name(frame_triggering)) if pdu is None: logger.error("pdu is None") else: - logger.debug("PDU: " + ea.get_element_name(pdu)) + new_frame.pdu_name = ea.get_element_name(pdu) + logger.debug("PDU: " + new_frame.pdu_name) if new_frame.comment is None: new_frame.add_comment(ea.get_element_desc(pdu)) @@ -1592,7 +1632,7 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header else: pdu_sig_mapping = ea.selector(pdu, "//I-SIGNAL-TO-I-PDU-MAPPING") if pdu_sig_mapping: - get_signals(pdu_sig_mapping, new_frame, ea, None, float_factory) + get_signals(pdu_sig_mapping, new_frame, ea, None, float_factory, signal_triggerings=isignaltriggerings_of_current_cluster) # Seen some pdu_sig_mapping being [] and not None with some arxml 4.2 else: # AR 4.2 pdu_trigs = ea.follow_all_ref(frame_triggering, "PDU-TRIGGERINGS-REF") @@ -1611,7 +1651,7 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header ea.get_element_name(ipdus)) # signal_to_pdu_map = get_children(signal_to_pdu_maps, "I-SIGNAL-TO-I-PDU-MAPPING", arDict, ns) get_signals(signal_to_pdu_maps, new_frame, ea, None, - float_factory) # todo BUG expects list, not item + float_factory, signal_triggerings=isignaltriggerings_of_current_cluster) # todo BUG expects list, not item else: logger.debug("Frame %s (assuming AR4.2) no PDU-TRIGGERINGS found", new_frame.name) if new_frame.is_pdu_container and new_frame.cycle_time == 0: @@ -1623,6 +1663,7 @@ def get_frame(frame_triggering, ea, multiplex_translation, float_factory, header new_frame.fit_dlc() if frame_elem is not None: frames_cache[frame_elem] = new_frame + return copy.deepcopy(new_frame) @@ -1664,18 +1705,18 @@ def ecuc_extract_signal(signal_node, ea): # timeout = 0 for attribute in attributes: if attribute.text.endswith("ComBitPosition"): - start_bit = int(attribute.getparent().find(".//" + ea.ns + "VALUE").text) + start_bit = int(attribute.getparent().find(".//" + ea.ns + "VALUE").text, 0) if attribute.text.endswith("ComBitSize"): - size = int(attribute.getparent().find(".//" + ea.ns + "VALUE").text) + size = int(attribute.getparent().find(".//" + ea.ns + "VALUE").text, 0) if attribute.text.endswith("ComSignalEndianness"): endianness = attribute.getparent().find(".//" + ea.ns + "VALUE").text is_little = "LITTLE_ENDIAN" in endianness if attribute.text.endswith("ComSignalInitValue"): - init_value = int(attribute.getparent().find(".//" + ea.ns + "VALUE").text) + init_value = int(attribute.getparent().find(".//" + ea.ns + "VALUE").text, 0) if attribute.text.endswith("ComSignalType"): signal_type = attribute.getparent().find(".//" + ea.ns + "VALUE").text if attribute.text.endswith("ComTimeout"): - timeout = int(attribute.getparent().find(".//" + ea.ns + "VALUE").text) + timeout = int(attribute.getparent().find(".//" + ea.ns + "VALUE").text, 0) return canmatrix.Signal(ea.get_element_name(signal_node), start_bit=start_bit, size=size, is_little_endian=is_little) @@ -1741,10 +1782,10 @@ def decode_ethernet_helper(ea, float_factory): logger.info("ETH PDU " + ipdu_name + " found") target_frame = canmatrix.Frame(name=ipdu_name) try: - target_frame.header_id = int(header_id.text) + target_frame.header_id = int(header_id.text, 0) except: try: - target_frame.header_id = int(pdu_triggering_header_id_map[ipdu_triggering]) + target_frame.header_id = int(pdu_triggering_header_id_map[ipdu_triggering], 0) except: target_frame.header_id = 0 # continue @@ -1773,13 +1814,13 @@ def decode_flexray_helper(ea, float_factory): frames = ea.findall("FLEXRAY-FRAME-TRIGGERING", pc) for frame_element in frames: frame_counter += 1 - slot_id = int(ea.get_child(frame_element, "SLOT-ID").text) + slot_id = int(ea.get_child(frame_element, "SLOT-ID").text, 0) base_cycle = ea.get_child(frame_element, "BASE-CYCLE").text ipdu_triggerings = ea.get_children(frame_element, "I-PDU-TRIGGERING") frame_repetition_cycle = ea.find_children_by_path(frame_element, "CYCLE-REPETITION/CYCLE-REPETITION")[ 0].text network_endpoints = pc.findall('.//' + ea.ns + "NETWORK-ENDPOINT") - frame_size = int(ea.find_children_by_path(frame_element, "FRAME/FRAME-LENGTH")[0].text) + frame_size = int(ea.find_children_by_path(frame_element, "FRAME/FRAME-LENGTH")[0].text, 0) frame = canmatrix.Frame(size=frame_size, arbitration_id=frame_counter) frame.slot_id = slot_id frame.base_cycle = base_cycle @@ -1789,7 +1830,7 @@ def decode_flexray_helper(ea, float_factory): ipdu_triggering_name = ea.get_element_name(ipdu_triggering) ipdu = ea.get_child(ipdu_triggering, "I-PDU") pdu_type = ea.get_child(ipdu_triggering, "I-PDU-REF").attrib["DEST"] - ipdu_length = int(ea.get_child(ipdu, "LENGTH").text) + ipdu_length = int(ea.get_child(ipdu, "LENGTH").text,0) pdu_port_type = ea.get_child(ipdu_triggering, "I-PDU-PORT-REF").attrib["DEST"] ipdu_name = ea.get_element_name(ipdu) target_pdu = canmatrix.Pdu(name=ipdu_name, size=ipdu_length, @@ -1817,10 +1858,16 @@ def decode_can_helper(ea, float_factory, ignore_cluster_info): db.add_ecu_defines("NWM-Stationsadresse", 'HEX 0 63') db.add_ecu_defines("NWM-Knoten", 'ENUM "nein","ja"') db.add_signal_defines("LongName", 'STRING') + db.add_signal_defines("CompuMethodName", 'STRING') + db.add_signal_defines("ISignalName", 'STRING') + db.add_signal_defines("SysSignalName", 'STRING') db.add_frame_defines("GenMsgDelayTime", 'INT 0 65535') db.add_frame_defines("GenMsgNrOfRepetitions", 'INT 0 65535') db.add_frame_defines("GenMsgStartValue", 'STRING') + db.add_frame_defines("FrameTriggeringName", 'STRING') + db.add_frame_defines("PduName", 'STRING') db.add_frame_defines("GenMsgStartDelayTime", 'INT 0 65535') + db.add_frame_defines( "GenMsgSendType", 'ENUM "cyclicX","spontanX","cyclicIfActiveX","spontanWithDelay","cyclicAndSpontanX","cyclicAndSpontanWithDelay","spontanWithRepitition","cyclicIfActiveAndSpontanWD","cyclicIfActiveFast","cyclicWithRepeatOnDemand","none"') @@ -1837,13 +1884,13 @@ def decode_can_helper(ea, float_factory, ignore_cluster_info): bus_name = ea.get_element_name(cc) if speed is not None: - db.baudrate = int(speed.text) + db.baudrate = int(speed.text, 0) elif baudrate_elem is not None: - db.baudrate = int(baudrate_elem.text) + db.baudrate = int(baudrate_elem.text, 0) logger.debug("Baudrate: " + str(db.baudrate)) if fd_baudrate_elem is not None: - db.fd_baudrate = int(fd_baudrate_elem.text) + db.fd_baudrate = int(fd_baudrate_elem.text, 0) can_frame_trig = ea.selector(cc, "/CAN-PHYSICAL-CHANNEL//CAN-FRAME-TRIGGERING") @@ -1851,16 +1898,16 @@ def decode_can_helper(ea, float_factory, ignore_cluster_info): for frameTrig in can_frame_trig: # type: _Element frame = get_frame(frameTrig, ea, multiplex_translation, float_factory, headers_are_littleendian) if frame is not None: - comm_direction = ea.selector(frameTrig, ">>FRAME-PORT-REF/COMMUNICATION-DIRECTION") - if len(comm_direction) > 0: - ecu_elem = ea.get_ecu_instance(element=comm_direction[0]) + comm_directions = ea.selector(frameTrig, ">>FRAME-PORT-REF/COMMUNICATION-DIRECTION") + for comm_direction in comm_directions: + ecu_elem = ea.get_ecu_instance(element=comm_direction) if ecu_elem is not None: if ecu_elem in nodes: ecu = nodes[ecu_elem] else: ecu = process_ecu(ecu_elem, ea) nodes[ecu_elem] = ecu - if comm_direction[0].text == "OUT": + if comm_direction.text == "OUT": frame.add_transmitter(ecu.name) else: frame.add_receiver(ecu.name) @@ -1936,7 +1983,6 @@ def load(file, **options): ea = Earxml() ea.open(file) - com_module = ea.get_short_name_path("/ActiveEcuC/Com") if com_module is not None and len(com_module) > 0: @@ -1958,6 +2004,88 @@ def load(file, **options): result.update(decode_can_helper(ea, float_factory, ignore_cluster_info)) + result = canmatrix.cancluster.CanCluster(result) + + def get_cluster_for_triggering(triggering): + while triggering != ea.root: + if triggering.tag.endswith("-CLUSTER"): + return triggering + triggering = triggering.getparent() + + def get_ecu_for_triggering(triggering): + phys_channel = None + while triggering != ea.root: + if triggering.tag.endswith("-CHANNEL"): + phys_channel = triggering + break + triggering = triggering.getparent() + + if phys_channel is not None: + ecu_name = "" + channel = ea.selector(phys_channel, ">COMMUNICATION-CONNECTOR-REF") + if len(channel) == 0: + return "" + ecu_instance = ea.get_parent_with_name(channel[0], "ECU-INSTANCE") + if ecu_instance is not None: + ecu_name = ea.get_short_name(ecu_instance) + return ecu_name + return "" + + + pdu_gateway_mappings = [] + for pdu_mapping in ea.selector(ea.root, "//I-PDU-MAPPING"): + source_triggering = ea.selector(pdu_mapping, ">SOURCE-I-PDU-REF") + target_triggering = ea.selector(pdu_mapping, ">TARGET-I-PDU-REF") + if len(source_triggering) == 0 or len(target_triggering) == 0: + continue + source_pdu = ea.selector(source_triggering[0], "/I-PDU-REF") + target_pdu = ea.selector(target_triggering[0], "/I-PDU-REF") + + if len(source_pdu) == 0 or len(target_pdu) == 0: + continue + source_cluster_name = ea.get_short_name_path_of_element(get_cluster_for_triggering(source_triggering[0])) + target_cluster_name = ea.get_short_name_path_of_element(get_cluster_for_triggering(target_triggering[0])) + ecu = get_ecu_for_triggering(source_triggering[0]) + source_type = "" + target_type = "" + if "DEST" in source_pdu[0].attrib: + source_type = source_pdu[0].attrib["DEST"] + if "DEST" in target_pdu[0].attrib: + target_type = target_pdu[0].attrib["DEST"] + + mapping_info = {"ecu": ecu, "source": source_pdu[0].text, "source_cluster": source_cluster_name, + "target": target_pdu[0].text, "target_cluster": target_cluster_name, + "target_type": target_type, "source_type": source_type} + + pdu_gateway_mappings.append(mapping_info) + + result.pdu_gateway(pdu_gateway_mappings) + + signal_gateway_mappings = [] + for signal_mapping in ea.selector(ea.root, "//I-SIGNAL-MAPPING"): + source_triggering = ea.selector(signal_mapping, ">SOURCE-SIGNAL-REF") + target_triggering = ea.selector(signal_mapping, ">TARGET-SIGNAL-REF") + if len(source_triggering) == 0 or len(target_triggering) == 0: + continue + source_signal = ea.selector(source_triggering[0], "/I-SIGNAL-REF") + target_signal = ea.selector(target_triggering[0], "/I-SIGNAL-REF") + if len(source_signal) == 0 or len(target_signal) == 0: + continue + source_cluster_name = ea.get_short_name_path_of_element(get_cluster_for_triggering(source_triggering[0])) + target_cluster_name = ea.get_short_name_path_of_element(get_cluster_for_triggering(target_triggering[0])) + ecu = get_ecu_for_triggering(source_triggering[0]) + source_type = "" + target_type = "" + if "DEST" in source_signal[0].attrib: + source_type = source_signal[0].attrib["DEST"] + if "DEST" in target_signal[0].attrib: + target_type = source_signal[0].attrib["DEST"] + mapping_info = {"ecu": ecu, "source": source_signal[0].text, "source_cluster": source_cluster_name, + "target": target_signal[0].text, "target_cluster": target_cluster_name, + "source_type": source_type, "target_type": target_type} + + signal_gateway_mappings.append(mapping_info) + result.signal_gateway(signal_gateway_mappings) return result diff --git a/src/canmatrix/formats/dbc.py b/src/canmatrix/formats/dbc.py index 3845326a..3aa1c157 100644 --- a/src/canmatrix/formats/dbc.py +++ b/src/canmatrix/formats/dbc.py @@ -91,7 +91,7 @@ def create_attribute_string(attribute, attribute_class, name, value, is_string): # type: (str, str, str, typing.Any, bool) -> str if is_string: value = '"' + value + '"' - elif not value: + elif value is None: value = '""' attribute_string = 'BA_ "' + attribute + '" ' + attribute_class + ' ' + name + ' ' + str(value) + ';\n' @@ -200,16 +200,20 @@ def dump(in_db, f, **options): output_names = collections.defaultdict(dict) # type: typing.Dict[canmatrix.Frame, typing.Dict[canmatrix.Signal, str]] for frame in db.frames: - # fix long frame names + # fix long frame names , warn if the frame name exceeds 32 characters if len(frame.name) > 32: frame.add_attribute("SystemMessageLongSymbol", frame.name) + logger.warning("Frame %s name exceeds 32 characters, consider updating the frame name within" + " character limit(Max 32 characters) ", frame.name) frame.name = frame.name[0:32] db.add_frame_defines("SystemMessageLongSymbol", "STRING") - # fix long signal names + # fix long signal names, warn if the signal name exceeds 32 characters for s in frame.signals: if len(s.name) > 32: s.add_attribute("SystemSignalLongSymbol", s.name) + logger.warning("Signal %s::%s name exceeds 32 characters, consider updating the signal name " + "within the character limit(Max 32 characters)", frame.name, s.name) s.name = s.name[0:32] db.add_signal_defines("SystemSignalLongSymbol", "STRING") @@ -233,8 +237,13 @@ def dump(in_db, f, **options): for signal in frame.signals: if signal.cycle_time != 0: signal.add_attribute("GenSigCycleTime", signal.cycle_time) - if signal.phys2raw(None) != 0: - signal.add_attribute("GenSigStartValue", signal.phys2raw(None)) + if "GenSigStartValue" in db.signal_defines: + if signal.phys2raw(None) != 0: + if db.signal_defines["GenSigStartValue"].defaultValue is not None and \ + float(signal.initial_value) != float(db.signal_defines["GenSigStartValue"].defaultValue): + signal.add_attribute("GenSigStartValue", signal.phys2raw(float(db.signal_defines["GenSigStartValue"].defaultValue))) + elif db.signal_defines["GenSigStartValue"].defaultValue is None: + signal.add_attribute("GenSigStartValue", signal.phys2raw(None)) name = normalized_names[signal] if compatibility: @@ -414,13 +423,7 @@ def dump(in_db, f, **options): # signal-values: for frame in db.frames: - multiplex_written = False for signal in frame.signals: - if signal.multiplex == 'Multiplexor' and multiplex_written: - continue - - multiplex_written = True - if signal.values: f.write( ('VAL_ %d ' % @@ -543,7 +546,7 @@ def add_frame_by_id(new_frame): # type: (canmatrix.Frame) -> None continue decoded = l.decode(dbc_import_encoding).strip() if decoded.startswith("BO_ "): - regexp = re.compile(r"^BO_ ([^\ ]+) ([^\ ]+) *: ([^\ ]+) ([^\ ]+)") + regexp = re.compile(r"^BO_ ([^\ ]+) ([^\ ]+) *: *([^\ ]+) ([^\ ]+)") temp = regexp.match(decoded) # db.frames.addFrame(Frame(temp.group(1), temp.group(2), temp.group(3), temp.group(4))) frame = canmatrix.Frame(temp.group(2), arbitration_id=int(temp.group(1)), @@ -952,7 +955,12 @@ def add_frame_by_id(new_frame): # type: (canmatrix.Frame) -> None # frame.extended = 1 for signal in frame.signals: - gen_sig_start_value = float_factory(signal.attributes.get("GenSigStartValue", "0")) + if "GenSigStartValue" in db.signal_defines \ + and db.signal_defines["GenSigStartValue"].defaultValue is not None: + default_value = signal.phys2raw(float_factory(db.signal_defines["GenSigStartValue"].defaultValue)) + else: + default_value = signal.phys2raw(None) + gen_sig_start_value = float_factory(signal.attributes.get("GenSigStartValue", default_value)) signal.initial_value = (gen_sig_start_value * signal.factor) + signal.offset signal.cycle_time = int(signal.attributes.get("GenSigCycleTime", 0)) if signal.attribute("SystemSignalLongSymbol") is not None: diff --git a/src/canmatrix/formats/fibex.py b/src/canmatrix/formats/fibex.py index aba8ce35..e8158475 100644 --- a/src/canmatrix/formats/fibex.py +++ b/src/canmatrix/formats/fibex.py @@ -29,10 +29,15 @@ import typing from builtins import * - import lxml.etree - +import logging import canmatrix +import re +import decimal + +clusterImporter = 1 + +logger = logging.getLogger(__name__) fx = "http://www.asam.net/xml/fbx" ho = "http://www.asam.net/xml" @@ -73,6 +78,257 @@ def create_sub_element_ho(parent, element_name, element_text=None): return new +class Fe: + def __init__(self, filename): + self.tree = lxml.etree.parse(filename) + self.root = self.tree.getroot() # type: _Element + + self.ns = "{" + self.tree.xpath('namespace-uri(.)') + "}" # type: str + self.nsp = self.tree.xpath('namespace-uri(.)') + self.ans = "{http://www.asam.net/xml}" + self._id_cache = {a.attrib["ID"]:a for a in self.root.xpath(".//*[@ID]")} + self._id_rev_cache = {} + for referencer in self.root.xpath(".//*[@ID-REF]"): + ref_id = referencer.attrib["ID-REF"] + if ref_id not in self._id_rev_cache: + self._id_rev_cache[ref_id] = [referencer] + else: + self._id_rev_cache[ref_id].append(referencer) + + def sn(self, tag): + sn = tag.find("./" + self.ans + "SHORT-NAME").text + return sn + + def get_referencable_parent(self, xml_element): + path = "" + while xml_element != self.root: + try: + current_short_name = self.sn(xml_element) + return xml_element + except AttributeError: + pass + xml_element = xml_element.getparent() + return xml_element + + def find_parent(self, start_element, parent_tag): + while start_element != self.root: + if start_element.tag == self.ns + parent_tag or start_element == self.ans + parent_tag: + return start_element + start_element = start_element.getparent() + return None + + def get_desc_or_longname(self, element): + long_name_elements = self.selector(element, "./!LONG-NAME") + if len(long_name_elements) > 0: + return long_name_elements[0].text + + desc_elements = self.selector(element, "./!DESC") + if len(desc_elements) > 0: + return desc_elements[0].text + + return "" + + def selector(self, start_element, selector): + start_pos = 0 + token = "" + result_list = [start_element] + last_found_token = 0 + while start_pos < len(selector): + token_match = re.search(r'//|/!|/|!|>>|>!|<|$', selector[start_pos:]) + found_token = token_match.span() + if start_pos > 0: # at least one Token found... + value = selector[last_found_token:start_pos + found_token[0]] + if token == "//": + result_list = [c for a in result_list for c in a.findall(".//" + self.ns + value)] + elif token == "/!": + result_list = [c for a in result_list for c in a.findall(".//" + self.ans + value)] + elif token == "/": + if value == "..": + result_list = [a.getparent() for a in result_list] + else: + result_list = [a.find("./" + self.ns + value) for a in result_list] + elif token == "!": + result_list = [a.find("./" + self.ans + value) for a in result_list] + elif token == ">": + start_points = [a.find("./" + self.ns + value).attrib["ID-REF"] for a in result_list] + result_list = [self._id_cache[a] for a in start_points] + elif token == "<<": + id_list = [a.attrib["ID"] for a in result_list] + result_list = [b for a in id_list for b in self._id_rev_cache[a]] + result_list = [a for a in result_list if self.get_referencable_parent(a) is not None and self.get_referencable_parent(a).tag.endswith(value)] + elif token == "<>": + start_points = [c.attrib["ID-REF"] for a in result_list for c in a.findall(".//" + self.ns + value)] + result_list = [self._id_cache[a] for a in start_points] + elif token == ">!": + start_points = [c.attrib["ID-REF"] for a in result_list for c in a.findall(".//" + self.ans + value)] + result_list = [self._id_cache[a] for a in start_points] + elif token == ">>": + start_points = [c.attrib["ID-REF"] for a in result_list for c in a.findall(".//" + self.ns + value)] + result_list = [self._id_cache[a] for a in start_points] + elif token == "^": + result_list = [self.find_parent(a, value) for a in result_list if a is not None and self.find_parent(a, value) is not None] + + result_list = [a for a in result_list if a is not None] + last_found_token = found_token[1] + start_pos + token = selector[start_pos + found_token[0]:start_pos + found_token[1]] + start_pos += found_token[1] + return sorted(result_list, key=lambda element: element.sourceline) + + +def get_signals_for_pdu(fe, pdu, overall_startbit = 0): + signals = [] + ecus = [] + for signal_instance in fe.selector(pdu, "//SIGNAL-INSTANCE"): + byte_order_element = fe.selector(signal_instance, "/IS-HIGH-LOW-BYTE-ORDER") + if byte_order_element[0].text == "false": + is_little_endian = True + else: + is_little_endian = False + + start_bit = int(fe.selector(signal_instance, "/BIT-POSITION")[0].text, 0) + overall_startbit + signal = fe.selector(signal_instance, ">SIGNAL-REF")[0] + ecu_instance_refs = fe.selector(signal_instance, "< 0: + ecu_name = fe.sn(fe.get_referencable_parent(ecu_instance_ref)) + receiver_ecus.append(ecu_name) + ecus.append(canmatrix.Ecu(name=ecu_name.strip())) + + signal_name = fe.sn(signal) + coding = fe.selector(signal, ">CODING-REF")[0] + bit_length = int(fe.selector(coding, "/!BIT-LENGTH")[0].text) + compu_methods = fe.selector(coding, "/!COMPU-METHOD") + sig = canmatrix.Signal(name=signal_name) + + for compu_method in compu_methods: + category = fe.selector(compu_method, "/!CATEGORY") + if len(category) > 0 and category[0].text == "LINEAR": + numerator = fe.selector(compu_method, "/!COMPU-NUMERATOR")[0] + denominator = fe.selector(compu_method, "/!COMPU-DENOMINATOR")[0] + teiler = decimal.Decimal(fe.selector(denominator, "/!V")[0].text) + [offset, factor] = [decimal.Decimal(a.text) for a in fe.selector(numerator, "/!V")] + [offset, factor] = [a / teiler for a in [offset, factor]] + sig.offset = offset + sig.factor = factor + try: + sig.min = decimal.Decimal(fe.selector(compu_method, "!PHYS-CONSTRS!LOWER-LIMIT")[0].text) + sig.max = decimal.Decimal(fe.selector(compu_method, "!PHYS-CONSTRS!UPPER-LIMIT")[0].text) + except: + pass + unit = fe.selector(compu_method, ">!UNIT-REF") + if len(unit) > 0: + try: + sig.unit = fe.selector(unit[0], "!DISPLAY-NAME")[0].text + except: + pass + elif len(category) > 0 and category[0].text == "TEXTTABLE": + for compu_scale in fe.selector(compu_method, "/!COMPU-SCALE"): + try: + value_name = fe.selector(compu_scale, "!COMPU-CONST!VT")[0].text + except IndexError: + value_name = fe.get_desc_or_longname(compu_scale) + value_value = fe.selector(compu_scale, "!LOWER-LIMIT")[0].text + sig.add_values(value_value, value_name) + sig.is_little_endian = is_little_endian + if not sig.is_little_endian: + sig.set_startbit(start_bit, bitNumbering=1) + else: + sig.start_bit = start_bit + sig.size = bit_length + sig.receivers = list(set(receiver_ecus)) + + sig.add_comment(fe.get_desc_or_longname(signal)) + signals.append(sig) + return signals, ecus + + +def load(f, **_options): + fe = Fe(f) + result = {} + sig_group_counter = 0 + + clusters = fe.selector(fe.root, "//CLUSTER") + names = [fe.sn(a) for a in clusters] + logger.info("Found clusters: " + ",".join(names)) + + for cluster in clusters: + if "CAN" not in fe.selector(cluster, "//PROTOCOL")[0].text: + logger.info(fe.sn(cluster) + " seems not to be a CAN cluster - ignoring") + continue + + db = canmatrix.CanMatrix() + result[fe.sn(cluster)] = db + channels = fe.selector(cluster, ">>CHANNEL-REF") + for channel in channels: + for ft in fe.selector(channel, "//FRAME-TRIGGERING"): + ports = fe.selector(ft, "<FRAME-REF")[0] + frame_size = int(fe.selector(frame_element, "/BYTE-LENGTH")[0].text) + + pdu_instances = fe.selector(frame_element, "//PDU-INSTANCE") + + if len(pdu_instances) > 1: + frame_name = fe.sn(frame_element) + frame = canmatrix.Frame(name=frame_name) + for pdu_instance in pdu_instances: + pdu = fe.selector(pdu_instance, ">PDU-REF")[0] + pdu_startbit_position = int(fe.selector(pdu_instance, "/BIT-POSITION")[0].text, 0) + signals, ecus = get_signals_for_pdu(fe, pdu, pdu_startbit_position) + for sig in signals: + frame.add_signal(sig) + for ecu in ecus: + db.add_ecu(ecu) + + frame.add_signal_group(fe.sn(pdu), sig_group_counter, [sig.name for sig in signals]) + sig_group_counter += 1 + else: + pdu = fe.selector(pdu_instances[0], ">PDU-REF")[0] + frame_name = fe.sn(pdu) + frame = canmatrix.Frame(name=frame_name) + + signals, ecus = get_signals_for_pdu(fe, pdu) + for sig in signals: + frame.add_signal(sig) + for ecu in ecus: + db.add_ecu(ecu) + + # fe.selector(pdu, "< 0: + pdu_triggerings = fe.selector(output_ports[0], ">>PDU-TRIGGERING-REF") + if len(pdu_triggerings) > 0: + cyclic_timing_element = fe.selector(pdu_triggerings[0], + "/TIMINGS/CYCLIC-TIMING/REPEATING-TIME-RANGE/VALUE") + if len(cyclic_timing_element) > 0: + time_value_string = cyclic_timing_element[0].text + if time_value_string.startswith("PT") and time_value_string.endswith("S"): + frame.cycle_time = decimal.Decimal(time_value_string[2:-1])*1000 + frame.transmitters = [fe.sn(a) for a in sending_ecus] + for ecu_element in sending_ecus: + ecu_name = fe.sn(ecu_element) + cm_ecu = canmatrix.Ecu(ecu_name) + cm_ecu.add_comment(fe.get_desc_or_longname(ecu_element)) + db.add_ecu(cm_ecu) + frame.arbitration_id = canmatrix.ArbitrationId(extended=extended, id=arbitration_id) + + frame.add_comment(fe.get_desc_or_longname(pdu)) + if "CAN-FD" in [a.text for a in + fe.selector(ft, "//CAN-FRAME-TX-BEHAVIOR") + fe.selector(ft, "//CAN-FRAME-RX-BEHAVIOR")]: + frame.is_fd = True + + db.add_frame(frame) + return result + + def dump(db, f, **options): # type: (canmatrix.CanMatrix, typing.IO, **typing.Any) -> None ns_map = {"fx": fx, "ho": ho, "can": can, "xsi": xsi} @@ -92,7 +348,7 @@ def dump(db, f, **options): # ELEMENTS # elements = create_sub_element_fx(root, "ELEMENTS") - + # # CLUSTERS # @@ -101,7 +357,7 @@ def dump(db, f, **options): cluster.set('ID', 'canCluster1') create_short_name_desc(cluster, "clusterShort", "clusterDesc") create_sub_element_fx(cluster, "SPEED", "500") - create_sub_element_fx(cluster, "IS-HIGH-LOW-BIT-ORDER", "false") + create_sub_element_fx(cluster, "IS-HIGH-LOW-BIT-ORDER", "true") create_sub_element_fx(cluster, "BIT-COUNTING-POLICY", "MONOTONE") protocol = create_sub_element_fx(cluster, "PROTOCOL", "CAN") protocol.attrib['{{{pre}}}type'.format(pre=xsi)] = "can:PROTOCOL-TYPE" @@ -117,7 +373,24 @@ def dump(db, f, **options): channels = create_sub_element_fx(elements, "CHANNELS") channel = create_sub_element_fx(channels, "CHANNEL") # for each channel + channel.set('ID', 'CANCHANNEL01') create_short_name_desc(channel, "CANCHANNEL01", "Can Channel Description") + + # for pdu triggerings + pdu_triggerings = create_sub_element_fx(channel, "PDU-TRIGGERINGS") + for pdu in db.frames: + pdu_triggering = create_sub_element_fx( + pdu_triggerings, "PDU-TRIGGERING") + pdu_triggering.set("ID", "PDU_" + pdu.name) + pdu_timings = create_sub_element_fx(pdu_triggering, "TIMINGS") + if pdu.cycle_time > 0: + cyclic_timing = create_sub_element_fx(pdu_timings, "CYCLIC-TIMING") + repeating_time_range = create_sub_element_fx(cyclic_timing, "REPEATING-TIME-RANGE") + create_sub_element_fx(repeating_time_range, "VALUE", "PT" + str(pdu.cycle_time/1000.0) + "S") + + pdu_ref = create_sub_element_fx(pdu_triggering, "PDU-REF") + pdu_ref.set("ID-REF", "PDU_" + pdu.name) + frame_triggerings = create_sub_element_fx(channel, "FRAME-TRIGGERINGS") for frame in db.frames: frame_triggering = create_sub_element_fx( @@ -139,16 +412,32 @@ def dump(db, f, **options): function_refs = create_sub_element_fx(ecu, "FUNCTION-REFS") func_ref = create_sub_element_fx(function_refs, "FUNCTION-REF") func_ref.set("ID-REF", "FCT_" + bu.name) - + + controllers = create_sub_element_fx(ecu, "CONTROLLERS") + controller = create_sub_element_fx(controllers, "CONTROLLER") + create_short_name_desc(controller, bu.name, bu.comment) + controller.set('ID', 'Controller_' + bu.name) + connectors = create_sub_element_fx(ecu, "CONNECTORS") connector = create_sub_element_fx(connectors, "CONNECTOR") - + connector.set('ID', 'Connector' + bu.name) + channel_ref = create_sub_element_fx(connector, "CHANNEL-REF") + channel_ref.set("ID-REF", "CANCHANNEL01") + controller_ref = create_sub_element_fx(connector, "CONTROLLER-REF") + controller_ref.set("ID-REF", 'Controller_' + bu.name) inputs = create_sub_element_fx(connector, "INPUTS") for frame in db.frames: if bu.name in frame.receivers: input_port = create_sub_element_fx(inputs, "INPUT-PORT") frame_triggering_ref = create_sub_element_fx(input_port, "FRAME-TRIGGERING-REF") frame_triggering_ref.set("ID-REF", "FT_" + frame.name) + # Reference to PDUs + included_pdus = create_sub_element_fx(input_port, "INCLUDED-PDUS") + included_pdu = create_sub_element_fx(included_pdus, "INCLUDED-PDU") + included_pdu.set('ID', 'input_included_pdu_' + frame.name) + pdu_triggering_ref = create_sub_element_fx(included_pdu, "PDU-TRIGGERING-REF") + pdu_triggering_ref.set("ID-REF", "PDU_" + frame.name) + outputs = create_sub_element_fx(connector, "OUTPUTS") for frame in db.frames: @@ -156,6 +445,12 @@ def dump(db, f, **options): input_port = create_sub_element_fx(outputs, "OUTPUT-PORT") frame_triggering_ref = create_sub_element_fx(input_port, "FRAME-TRIGGERING-REF") frame_triggering_ref.set("ID-REF", "FT_" + frame.name) + # Reference to PDUs + included_pdus = create_sub_element_fx(input_port, "INCLUDED-PDUS") + included_pdu = create_sub_element_fx(included_pdus, "INCLUDED-PDU") + included_pdu.set('ID', 'output_included_pdu_' + frame.name) + pdu_triggering_ref = create_sub_element_fx(included_pdu, "PDU-TRIGGERING-REF") + pdu_triggering_ref.set("ID-REF", "PDU_" + frame.name) # ignore CONTROLERS/CONTROLER @@ -311,6 +606,26 @@ def dump(db, f, **options): create_sub_element_ho(compu_denominator, "V", "1") # nenner # defaultValue = create_sub_element_ho(compuInternalToPhys,"COMPU-DEFAULT-VALUE") + # CAN signal interpretation + if signal.values: + compu_method = create_sub_element_ho(compu_methods, "COMPU-METHOD") + create_sub_element_ho(compu_method, "SHORT-NAME", + "SIGNAL_INTERPRETATION_" + signal.name) + create_sub_element_ho(compu_method, "CATEGORY", "TEXTTABLE") + compu_int_to_phys = create_sub_element_ho(compu_method, "COMPU-INTERNAL-TO-PHYS") + compu_scales = create_sub_element_ho(compu_int_to_phys, "COMPU-SCALES") + + for value, text in signal.values.items(): + compu_scale = create_sub_element_ho(compu_scales, "COMPU-SCALE") + + lower_limit = create_sub_element_ho(compu_scale, "LOWER-LIMIT", str(value)) + lower_limit.set("INTERVAL-TYPE", "CLOSED") + upper_limit = create_sub_element_ho(compu_scale, "UPPER-LIMIT", str(value)) + upper_limit.set("INTERVAL-TYPE", "CLOSED") + + compu_const = create_sub_element_ho(compu_scale, "COMPU-CONST") + create_sub_element_ho(compu_const, "VT", text) + # # REQUIREMENTS # diff --git a/src/canmatrix/formats/json.py b/src/canmatrix/formats/json.py index e32c8662..bffc4330 100644 --- a/src/canmatrix/formats/json.py +++ b/src/canmatrix/formats/json.py @@ -30,7 +30,7 @@ import sys import typing from builtins import * - +import decimal import canmatrix @@ -104,6 +104,20 @@ def dump(db, f, **options): symbolic_frame["attributes"] = frame_attributes export_dict['messages'].append(symbolic_frame) else: # export_all + _define_mapping = {"signal_defines": db.signal_defines, "frame_defines": db.frame_defines, + "global_defines": db.global_defines, "env_defines": db.env_defines, "ecu_defines": db.ecu_defines} + for define_type in _define_mapping: + export_dict[define_type] = [{"name": a, + "define": _define_mapping[define_type][a].definition, + "default": _define_mapping[define_type][a].defaultValue, + "type": _define_mapping[define_type][a].type} for a in _define_mapping[define_type]] + export_dict['ecus'] = {ecu.name: ecu.comment for ecu in db.ecus} + export_dict['attributes'] = db.attributes + export_dict['value_tables'] = db.value_tables + export_dict['env_vars'] = db.env_vars + export_dict['baudrate'] = db.baudrate + export_dict['fd_baudrate'] = db.fd_baudrate + for frame in db.frames: frame_attributes = {attribute: frame.attribute(attribute, db=db) for attribute in db.frame_defines} symbolic_signals = [] @@ -132,10 +146,13 @@ def dump(db, f, **options): "is_signed": signal.is_signed, "is_float": signal.is_float, "comment": signal.comment, + "comments": signal.comments, "attributes": attributes, + "initial_value": number_converter(signal.initial_value), "values": values, - "is_multiplexer" : signal.is_multiplexer, - "mux_value" : signal.mux_val + "is_multiplexer": signal.is_multiplexer, + "mux_value": signal.mux_val, + "receivers": signal.receivers, } if signal.multiplex is not None: symbolic_signal["multiplex"] = signal.multiplex @@ -156,7 +173,14 @@ def dump(db, f, **options): "signals": symbolic_signals, "attributes": frame_attributes, "comment": frame.comment, - "length": frame.size}) + "length": frame.size, + "is_complex_multiplexed": frame.is_complex_multiplexed, + "mux_names": frame.mux_names, + "cycle_time": frame.cycle_time, + "is_j1939": frame.is_j1939, + "header_id": frame.header_id, + "pdu_name": frame.pdu_name, + "transmitters": frame.transmitters}) if sys.version_info > (3, 0): import io temp = io.TextIOWrapper(f, encoding='UTF-8') @@ -192,15 +216,36 @@ def load(f, **_options): key = int(key) db.value_tables.setdefault(val_tab_name, {})[key] = val + if "ecus" in json_data: + for ecu in json_data["ecus"]: + new_ecu = canmatrix.Ecu(name=ecu, comment=json_data["ecus"][ecu]) + db.add_ecu(new_ecu) if "messages" in json_data: for frame in json_data["messages"]: # new_frame = Frame(frame["id"],frame["name"],8,None) - new_frame = canmatrix.Frame(frame["name"], arbitration_id=frame["id"], size=8) + arb_id = canmatrix.canmatrix.ArbitrationId(id=frame["id"], extended=frame.get("is_extended_frame", "False")) + new_frame = canmatrix.Frame(frame["name"], arbitration_id=arb_id, size=8) if "length" in frame: new_frame.size = frame["length"] + simple_mapping = ["is_complex_multiplexed", "mux_names", "cycle_time", "is_j1939", "header_id", "pdu_name"] + for key in simple_mapping: + if key in frame: + if key == "is_complex_multiplexed": + new_frame.is_complex_multiplexed = frame[key] + elif key == "mux_names": + new_frame.mux_names = frame[key] + elif key == "cycle_time": + new_frame.cycle_time = frame[key] + elif key == "is_j1939": + new_frame.is_j1939 = frame[key] + elif key == "header_id": + new_frame.header_id = frame[key] + elif key == "pdu_name": + new_frame.pdu_name = frame[key] new_frame.arbitration_id.extended = frame.get("is_extended_frame", False) - + if "transmitters" in frame: + new_frame.transmitters = frame["transmitters"] for signal in frame["signals"]: is_little_endian = not signal.get("is_big_endian", False) is_float = signal.get("is_float", False) @@ -232,10 +277,52 @@ def load(f, **_options): if signal.get("values", False): for key in signal["values"]: new_signal.add_values(key, signal["values"][key]) + + if signal.get("attributes", False): + for key in signal["attributes"]: + new_signal.add_attribute(key, signal["attributes"][key]) + + if "comment" in signal: + new_signal.comment = signal["comment"] + + if "comments" in signal: + new_signal.comments = signal["comments"] + + if "initial_value" in signal: + new_signal.initial_value = decimal.Decimal(signal["initial_value"]) + + if signal.get("receivers", False): + for ecu in signal["receivers"]: + new_signal.add_receiver(ecu) if new_signal.is_little_endian is False: new_signal.set_startbit( new_signal.start_bit, bitNumbering=1, startLittle=True) new_frame.add_signal(new_signal) db.add_frame(new_frame) + + _define_list = {"signal_defines": db.add_signal_defines, "frame_defines": db.add_frame_defines, + "global_defines": db.add_global_defines, "env_defines": db.add_env_defines, + "ecu_defines": db.add_ecu_defines} + for define_type, fptr in _define_list.items(): + if define_type in json_data: + for define in json_data[define_type]: + fptr(define['name'], define['define']) + + cm_import_list_dict = {'attributes': db.attributes, 'value_tables': db.value_tables, 'env_vars': db.env_vars} + + cm_import_list_val = ['baudrate', 'fd_baudrate'] + + for key in cm_import_list_dict: + if key in json_data: + cm_import_list_dict[key].update(json_data[key]) + + for key in cm_import_list_val: + if key in json_data: + if key == 'baudrate': + db.baudrate = json_data[key] + elif key == 'fd_baudrate': + db.fd_baudrate = json_data[key] + f.close() + db.update_ecu_list() return db diff --git a/src/canmatrix/formats/ldf.py b/src/canmatrix/formats/ldf.py index e3932196..c239724e 100644 --- a/src/canmatrix/formats/ldf.py +++ b/src/canmatrix/formats/ldf.py @@ -6,9 +6,10 @@ def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatrix - ldf = ldfparser.parseLDF(path=f.name) # using f.name is not nice, but works + ldf = ldfparser.parse_ldf(path=f.name) # using f.name is not nice, but works db = canmatrix.CanMatrix() + db.baudrate = ldf.get_baudrate() for lin_frame in ldf.frames: cm_frame = canmatrix.Frame() @@ -20,6 +21,7 @@ def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatri for mapping in lin_frame.signal_map: lin_signal = mapping[1] cm_signal = canmatrix.Signal() + cm_signal.is_signed = False if lin_signal.name in ldf.converters: for converter in ldf.converters[lin_signal.name]._converters: if isinstance(converter, ldfparser.encoding.LogicalValue): diff --git a/src/canmatrix/formats/scapy.py b/src/canmatrix/formats/scapy.py index 3124abad..7a5c47d4 100644 --- a/src/canmatrix/formats/scapy.py +++ b/src/canmatrix/formats/scapy.py @@ -64,7 +64,7 @@ def dump(db, f, **options): # type: (canmatrix.CanMatrix, typing.IO, **typing.A for frame in db.frames: scapy_decoder += "class " + frame.name + "(SignalPacket):\n" - scapy_decoder += " fields_desc = [ \n" + scapy_decoder += " fields_desc = [\n" if frame.is_multiplexed and not frame.is_complex_multiplexed: multiplexer = frame.get_multiplexer @@ -83,7 +83,7 @@ def dump(db, f, **options): # type: (canmatrix.CanMatrix, typing.IO, **typing.A for frame in db.frames: if frame.arbitration_id.extended: scapy_decoder += "bind_layers(SignalHeader, " + frame.name + ", identifier = " + hex( - frame.arbitration_id.id) + ", flags = extended)\n" + frame.arbitration_id.id) + ", flags = \"extended\")\n" else: scapy_decoder += "bind_layers(SignalHeader, " + frame.name + ", identifier = " + hex( frame.arbitration_id.id) + ")\n" diff --git a/src/canmatrix/formats/xlsx.py b/src/canmatrix/formats/xlsx.py index c5d1f33a..7fa8d890 100644 --- a/src/canmatrix/formats/xlsx.py +++ b/src/canmatrix/formats/xlsx.py @@ -217,7 +217,12 @@ def dump(db, filename, **options): # sort signals: sig_hash = {} for sig in frame.signals: - sig_hash["%02d" % int(sig.get_startbit()) + sig.name] = sig + if motorola_bit_format == "msb": + sig_hash["%03d" % int(sig.get_startbit(bit_numbering=1)) + sig.name] = sig + elif motorola_bit_format == "msbreverse": + sig_hash["%03d" % int(sig.get_startbit()) + sig.name] = sig + else: # motorolaBitFormat == "lsb" + sig_hash["%03d" % int(sig.get_startbit(bit_numbering=1, start_little=True)) + sig.name] = sig # set style for first line with border signal_style = sty_first_frame diff --git a/src/canmatrix/formats/yaml.py b/src/canmatrix/formats/yaml.py index c62b005b..972c7f6a 100644 --- a/src/canmatrix/formats/yaml.py +++ b/src/canmatrix/formats/yaml.py @@ -74,7 +74,7 @@ def dump(db, f, **options): # type: (canmatrix.CanMatrix, typing.IO, **typing.A def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatrix __init_yaml() - db = yaml.load(f) + db = yaml.safe_load(f) return db diff --git a/src/canmatrix/tests/ARXMLCompuMethod1.arxml b/src/canmatrix/tests/ARXMLCompuMethod1.arxml new file mode 100644 index 00000000..dab5874e --- /dev/null +++ b/src/canmatrix/tests/ARXMLCompuMethod1.arxml @@ -0,0 +1,42 @@ + + + + + New_Frame_NewPDU_NewSignal_Encoding + SCALE_LINEAR_AND_TEXTTABLE + + + + + 0 + 0 + + no trailer detected + + + + 1 + 1 + + trailer detected + + + + TrailerPresence + 0 + 0 + + + 17 + 42 + + + 1 + + + + + + + + diff --git a/src/canmatrix/tests/test_arxml.py b/src/canmatrix/tests/test_arxml.py index 9ac336e7..b39c027b 100644 --- a/src/canmatrix/tests/test_arxml.py +++ b/src/canmatrix/tests/test_arxml.py @@ -27,14 +27,25 @@ def test_get_signals_from_container_i_pdu(): assert matrix["New_CanCluster"].frames[0].pdus[0].signals[0].attributes["SysSignalName"] == 'PDU_Contained_1_Signal1_905db81da40081cb' - def test_get_signals_from_secured_pdu(): here = Path(__file__).parent matrix = canmatrix.formats.arxml.load(str(here / "ARXMLSecuredPDUTest.arxml")) assert matrix["CAN"].frames[0].signals[0].name == 'someTestSignal' assert matrix["CAN"].frames[0].signals[1].name == 'Signal' + def test_min_max(): here = Path(__file__).parent matrix = canmatrix.formats.arxml.load(str(here / "ARXML_min_max.arxml")) - assert matrix["New_CanCluster"].frames[0].signals[0].is_signed == False + assert matrix["New_CanCluster"].frames[0].signals[0].is_signed is False + + +def test_decode_compu_method_1(): + here = Path(__file__).parent + ea = canmatrix.formats.arxml.Earxml() + ea.open(str(here / "ARXMLCompuMethod1.arxml")) + compu_method = ea.find("COMPU-METHOD") + values, factor, offset, unit, const = canmatrix.formats.arxml.decode_compu_method(compu_method, ea, float) + assert values == {'0': 'no trailer detected', '1': 'trailer detected'} + assert factor == 42 + assert offset == 17 diff --git a/src/canmatrix/tests/test_arxml_gw.py b/src/canmatrix/tests/test_arxml_gw.py new file mode 100644 index 00000000..8f3ee7af --- /dev/null +++ b/src/canmatrix/tests/test_arxml_gw.py @@ -0,0 +1,116 @@ +import canmatrix.formats.arxml +import textwrap +import io + + +def test_pdu_gateway(): + arxml = io.BytesIO(textwrap.dedent(u'''\ + + + + Cluster + + + someCluster + + + + + CHNL + + + SomeFrameTriggering + + + /Cluster/someCluster/CHNL/somePDUTriggering + + + + + + + someSignalTriggering + /ISignal/someSignal + + + + + somePduTriggering + /PDU/somePdu + + + + + + + + + + someOtherCluster + + + + + CHNL + + + SomeOtherFrameTriggering + + + /Cluster/someCluster/CHNL/someOtherPDUTrigering + + + + + + + someOtherSignalTriggering + /ISignal/someOtherSignal + + + + + someOtherPduTriggering + /PDU/someOtherPdu + + + + + + + + + + + + + Gateway + + + someECU + /ECU/someECU + + + /Cluster/someCluster/CHNL/somePduTriggering + + /Cluster/someOtherCluster/CHNL/someOtherPduTriggering + + + + + + /Cluster/someCluster/CHNL/someSignalTriggering + /Cluster/someOtherCluster/CHNL/someOtherSignalTriggering + + + + + + ''').encode('utf-8')) + + cluster = canmatrix.formats.arxml.load(arxml) + + assert cluster.get_pdu_routing_info("someOtherPdu")["target"][0]["pdu"] == "/PDU/somePdu" + assert cluster.get_pdu_routing_info("/PDU/somePdu", strict_search=True)["source"][0]["pdu"] == "/PDU/someOtherPdu" + assert cluster.get_signal_routing_info("someSignal")["source"][0]["signal"] == '/ISignal/someOtherSignal' + assert cluster.get_signal_routing_info("/ISignal/someOtherSignal")["target"][0]["signal"] == '/ISignal/someSignal' diff --git a/src/canmatrix/tests/test_canmatrix.py b/src/canmatrix/tests/test_canmatrix.py index d15433e5..3c1e6330 100644 --- a/src/canmatrix/tests/test_canmatrix.py +++ b/src/canmatrix/tests/test_canmatrix.py @@ -122,7 +122,7 @@ def test_decode_signal(): def test_ecu_find_attribute(): ecu = canmatrix.canmatrix.Ecu(name="Gateway") ecu.add_attribute("attr1", 255) - assert ecu.attribute("attr1") == 255 + assert ecu.attribute("attr1") == '255' def test_ecu_no_attribute(): @@ -159,7 +159,7 @@ def test_signal_find_mandatory_attribute(some_signal): def test_signal_find_optional_attribute(some_signal): some_signal.add_attribute("attr1", 255) - assert some_signal.attribute("attr1") == 255 + assert some_signal.attribute("attr1") == '255' def test_signal_no_attribute(some_signal): @@ -213,7 +213,7 @@ def test_signal_delete_wrong_attribute_doesnt_raise(some_signal): def test_signal_spn(some_signal): assert some_signal.spn is None some_signal.add_attribute("SPN", 10) - assert some_signal.spn == 10 + assert some_signal.spn == '10' def test_signal_set_startbit(): @@ -276,7 +276,7 @@ def test_signal_decode_named_value(some_signal): some_signal.add_values(255, "Init") some_signal.add_values(254, "Error") assert some_signal.raw2phys(254, decode_to_str=True) == "Error" - assert some_signal.raw2phys(200, decode_to_str=True) == 200 + assert some_signal.raw2phys(300, decode_to_str=True) == 450 def test_signal_encode_named_value(some_signal): @@ -383,7 +383,7 @@ def the_group(): @pytest.fixture def some_signal(): - return canmatrix.canmatrix.Signal(name="speed", size=8) + return canmatrix.canmatrix.Signal(name="speed", size=8, factor=1.5) def test_signalgroup_empty(the_group): @@ -854,7 +854,7 @@ def test_decoded_signal_phys_value(some_signal): def test_decoded_signal_named_value(): signal = canmatrix.canmatrix.Signal(factor="0.1", values={10: "Init"}) - decoded = canmatrix.canmatrix.DecodedSignal(100, signal) + decoded = canmatrix.canmatrix.DecodedSignal(10, signal) assert decoded.named_value == "Init" @@ -1092,3 +1092,27 @@ def test_baudrate(): assert cm.baudrate == 500000 cm.fd_baudrate = 1000000 assert cm.fd_baudrate == 1000000 + +def test_frame_compress(): + frame = canmatrix.Frame("my_frame", size=8) + frame.add_signal(canmatrix.Signal(name = "Sig1", start_bit = 2, size = 13, is_little_endian=False )) + frame.add_signal(canmatrix.Signal(name = "Sig2", start_bit = 17, size = 14, is_little_endian=False)) + frame.add_signal(canmatrix.Signal(name = "Sig3", start_bit = 35, size = 6, is_little_endian=False)) + frame.add_signal(canmatrix.Signal(name = "Sig4", start_bit = 49, size = 8, is_little_endian=False)) + frame.compress() + assert frame.signal_by_name("Sig1").start_bit == 0 + assert frame.signal_by_name("Sig2").start_bit == 13 + assert frame.signal_by_name("Sig3").start_bit == 27 + assert frame.signal_by_name("Sig4").start_bit == 33 + + frame = canmatrix.Frame("my_frame", size=8) + # some signals overlap! + frame.add_signal(canmatrix.Signal(name = "Sig1", start_bit = 12, size = 12, is_little_endian=True)) + frame.add_signal(canmatrix.Signal(name = "Sig2", start_bit = 17, size = 9, is_little_endian=True)) + frame.add_signal(canmatrix.Signal(name = "Sig3", start_bit = 33, size = 5, is_little_endian=True)) + frame.add_signal(canmatrix.Signal(name = "Sig4", start_bit = 48, size = 9, is_little_endian=True)) + frame.compress() + assert frame.signal_by_name("Sig1").start_bit == 0 + assert frame.signal_by_name("Sig2").start_bit == 12 + assert frame.signal_by_name("Sig3").start_bit == 21 + assert frame.signal_by_name("Sig4").start_bit == 26 diff --git a/src/canmatrix/tests/test_copy.py b/src/canmatrix/tests/test_copy.py index 80bbfb1b..6340a770 100644 --- a/src/canmatrix/tests/test_copy.py +++ b/src/canmatrix/tests/test_copy.py @@ -77,7 +77,7 @@ def test_copy_ecu_with_attributes(): assert len(matrix1.frames) == 1 assert len(matrix1.ecus) == 1 assert matrix1.ecu_by_name("ECU") is not None - assert matrix1.ecu_by_name("ECU").attribute("Node Address") == 42 + assert matrix1.ecu_by_name("ECU").attribute("Node Address") == '42' assert matrix1.ecu_by_name("ECU").attribute("some_ecu_define", matrix1) == "default_value" def test_copy_frame_default_attributes(): diff --git a/src/canmatrix/tests/test_dbc.py b/src/canmatrix/tests/test_dbc.py index aababd88..bf60deda 100644 --- a/src/canmatrix/tests/test_dbc.py +++ b/src/canmatrix/tests/test_dbc.py @@ -53,6 +53,7 @@ def test_create_comment_string(): test_string = canmatrix.formats.dbc.create_comment_string("BO_", "ident", "some comment", "utf8", "utf8", "") assert test_string == b'CM_ BO_ ident "some comment";\n' + def test_parse_comment_from_dbc(): dbc = io.BytesIO(textwrap.dedent(u'''\ BO_ 1 someFrame: 1 someEcu @@ -64,6 +65,7 @@ def test_parse_comment_from_dbc(): matrix = canmatrix.formats.dbc.load(dbc) assert matrix.frames[0].signals[0].comment == "resistance setting (0-100%)" + def test_parse_multi_line_comment(): dbc = io.BytesIO(textwrap.dedent(u'''\ BO_ 1 someFrame: 1 someEcu @@ -240,6 +242,7 @@ def test_export_of_unknown_defines(): if line.startswith("BA_ "): assert line.endswith('";') + def test_braces_in_attributes(): dbc = io.BytesIO(textwrap.dedent(u'''\ BO_ 20 frameName: 1 someEcu @@ -249,6 +252,7 @@ def test_braces_in_attributes(): ''').encode('utf-8')) matrix = canmatrix.formats.dbc.load(dbc, dbcImportEncoding="utf8") + def test_defines_with_spaces(): dbc = io.BytesIO(textwrap.dedent(u'''\ BU_: someOtherEcu @@ -276,6 +280,7 @@ def test_defines_with_spaces(): assert matrix.env_vars["someEnvVar"]["attributes"]["some attrib"] == '"some space"' assert matrix.ecus[0].attributes["Description X"] == "Some Some Text" + def test_writing_complex_multiplex(): db = canmatrix.CanMatrix() frame = canmatrix.Frame("someFrame") @@ -307,6 +312,7 @@ def test_defines_with_special_cars(): matrix = canmatrix.formats.dbc.load(dbc, dbcImportEncoding="utf8") assert matrix.frames[0].signals[0].attributes["Accuracy"] == "+/- 10.2 at 55.1%" + def test_j1939_frametype(): dbc = io.BytesIO(textwrap.dedent(u'''\ BU_: someOtherEcu @@ -346,6 +352,7 @@ def test_attributes_with_spaces_before_semicolumn(): assert matrix.frames[0].attributes["someAttribute"] == 'str' assert matrix.frames[1].attribute("someAttribute", matrix) == 'asd' + def test_cycle_time_handling(): dbc = io.BytesIO(textwrap.dedent(u'''\ BO_ 17 Frame_1: 8 Vector__XXX @@ -381,6 +388,7 @@ def test_cycle_time_handling(): outdbc = io.BytesIO() canmatrix.formats.dump({"aa":matrix}, outdbc, "kcd") + def test_keep_cycle_time_defines(): dbc = io.BytesIO(textwrap.dedent(u'''\ BO_ 17 Frame_1: 8 Vector__XXX @@ -396,6 +404,7 @@ def test_keep_cycle_time_defines(): assert 'BA_DEF_ BO_ "GenMsgCycleTime" INT 0 50000' in outdbc.getvalue().decode('utf8') assert 'BA_DEF_DEF_ "GenMsgCycleTime" 0' in outdbc.getvalue().decode('utf8') + def test_unique_signal_names(): db = canmatrix.CanMatrix() frame = canmatrix.Frame("some Frame") @@ -413,6 +422,7 @@ def test_unique_signal_names(): assert "signal_name1" not in outdbc.getvalue().decode('utf8') assert "signal_name" in outdbc.getvalue().decode('utf8') + def test_signal_inital_value(): dbc = io.BytesIO(textwrap.dedent(u'''\ BO_ 17 Frame_1: 8 Vector__XXX @@ -480,6 +490,7 @@ def test_missing_space(): matrix = canmatrix.formats.dbc.load(dbc, dbcImportEncoding="utf8") assert matrix.frames[0].signals[0].name == "sig1" + def test_escaped_quotes(): dbc = io.BytesIO(textwrap.dedent(r''' BO_ 17 Frame_1: 8 Vector__XXX @@ -516,3 +527,62 @@ def test_without_ecu(): matrix = canmatrix.formats.dbc.load(dbc, dbcImportEncoding="utf8") matrix.frames[0].signals[0].name == "A_B_C_D_E" + + +def test_default_initial_value(): + dbc = io.BytesIO(textwrap.dedent(u'''\ + BO_ 560 ECU1_Message: 1 ECU1 + SG_ ECU2_Signal : 0|8@0+ (1,-5) [-2|250] "g" ECU2 + + BA_DEF_ SG_ "GenSigStartValue" FLOAT 0.0 100.0; + + BA_DEF_DEF_ "GenSigStartValue" 10.0; + ''').encode('utf-8')) + + matrix = canmatrix.formats.dbc.load(dbc, dbcImportEncoding="utf8") + assert matrix.frames[0].signals[0].initial_value == 10 +# outdbc = io.BytesIO() +# canmatrix.formats.dump(matrix, outdbc, "dbc") + + +def test_no_initial_value(): + dbc = io.BytesIO(textwrap.dedent(u'''\ + BO_ 560 ECU1_Message: 1 ECU1 + SG_ ECU2_Signal : 0|8@0+ (1,-5) [-2|250] "g" ECU2 + + BA_DEF_ SG_ "GenSigStartValue" FLOAT 0.0 100.0; + ''').encode('utf-8')) + + matrix = canmatrix.formats.dbc.load(dbc, dbcImportEncoding="utf8") + outdbc = io.BytesIO() + canmatrix.formats.dump(matrix, outdbc, "dbc") + assert 'BA_ "GenSigStartValue" SG_ 560 ECU2_Signal 5;' in outdbc.getvalue().decode('utf8') +# assert matrix.frames[0].signals[0].initial_value == 10 + + +def test_int_attribute_zero(): + db = canmatrix.CanMatrix() + frame = canmatrix.Frame("some Frame") + frame.add_signal(canmatrix.Signal("signal_name", size=1, start_bit=1)) + db.add_frame(frame) + db.add_ecu_defines("ecu_define", "INT 0 10") + db.add_ecu_defines("ecu_define2", "INT 0 10") + db.add_ecu_defines("ecu_define3", "INT 0 10") + + db.add_frame_defines("test", "INT 0 10") + db.add_frame_defines("test2", "INT 0 10") + frame.add_attribute("test", 7) + frame.add_attribute("test2", 0) + + ecu = canmatrix.Ecu('TestEcu') + ecu.add_attribute('ecu_define', 1) + ecu.add_attribute('ecu_define2', 0) + + db.add_ecu(ecu) + + outdbc = io.BytesIO() + canmatrix.formats.dump(db, outdbc, "dbc") + assert 'BO_ 0 7' in outdbc.getvalue().decode('utf8') + assert 'BO_ 0 0' in outdbc.getvalue().decode('utf8') + assert 'TestEcu 1' in outdbc.getvalue().decode('utf8') + assert 'TestEcu 0' in outdbc.getvalue().decode('utf8') diff --git a/src/canmatrix/tests/test_json.py b/src/canmatrix/tests/test_json.py index 7345d5b0..7a4e7a1b 100644 --- a/src/canmatrix/tests/test_json.py +++ b/src/canmatrix/tests/test_json.py @@ -115,6 +115,7 @@ def test_import_min_max(): assert matrix.frames[0].signals[0].min == -5 assert matrix.frames[0].signals[0].max == 42 + def test_import_native(): json_input = """{ "messages": [ @@ -193,7 +194,7 @@ def test_import_export_enums(): f_in = io.BytesIO(f_out.getvalue()) new_matrix = canmatrix.formats.json.load(f_in) - assert new_matrix.value_tables == {"Options": {0: "North", 1: "East", 2: "South", 3: "West"}} + assert new_matrix.value_tables == {"Options": {"0": "North", "1": "East", "2": "South", "3": "West"}} def test_export_native(): @@ -208,6 +209,7 @@ def test_export_native(): assert (data['messages'][0]['signals'][0]['factor'] == 0.123) assert (data['messages'][0]['signals'][0]['offset'] == 1) + def test_export_all_native(): matrix = canmatrix.canmatrix.CanMatrix() frame = canmatrix.canmatrix.Frame(name="test_frame", size=6, arbitration_id=10) @@ -222,3 +224,16 @@ def test_export_all_native(): assert (data['messages'][0]['signals'][0]['factor'] == 0.123) assert (data['messages'][0]['signals'][0]['offset'] == 1) assert (data['messages'][0]['is_fd'] is False) + + +def test_export_extended(): + matrix = canmatrix.canmatrix.CanMatrix() + frame = canmatrix.canmatrix.Frame(name="test_frame", size=6, arbitration_id=canmatrix.canmatrix.ArbitrationId(extended=True, id=10)) + frame.pgn=22 + signal = canmatrix.Signal(name="someSigName", size=40) + frame.add_signal(signal) + matrix.add_frame(frame) + out_file = io.BytesIO() + canmatrix.formats.dump(matrix, out_file, "json") +# data = json.loads(out_file.getvalue().decode("utf-8")) + matrix = canmatrix.formats.loads_flat(out_file.getvalue().decode("utf-8"), "json", jsonExportAll=True) diff --git a/tox.ini b/tox.ini index 33b2c4d1..ab77dc1c 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{27,35,36,37,38,39,310}, pypy{,3}, mypy +envlist = py{37,38,39,310,311}, mypy [testenv] extras = @@ -12,8 +12,6 @@ deps= passenv= TOXENV CI - TRAVIS - TRAVIS_* APPVEYOR APPVEYOR_* commands= @@ -30,7 +28,7 @@ commands= [testenv:codecov] deps= - codecov==2.0.15 + codecov commands= codecov diff --git a/versioneer.py b/versioneer.py index 64fea1c8..9da01c2b 100644 --- a/versioneer.py +++ b/versioneer.py @@ -14,9 +14,6 @@ * [![Latest Version] (https://pypip.in/version/versioneer/badge.svg?style=flat) ](https://pypi.python.org/pypi/versioneer/) -* [![Build Status] -(https://travis-ci.org/warner/python-versioneer.png?branch=master) -](https://travis-ci.org/warner/python-versioneer) This is a tool for managing a recorded version number in distutils-based python projects. The goal is to remove the tedious and error-prone "update