Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions can/interfaces/cantact.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@
from unittest.mock import Mock

from can import BitTiming, BitTimingFd, BusABC, CanProtocol, Message

from ..exceptions import (
from can.exceptions import (
CanInitializationError,
CanInterfaceNotImplementedError,
error_check,
)
from ..typechecking import AutoDetectedConfig
from ..util import check_or_adjust_timing_clock, deprecated_args_alias
from can.typechecking import AutoDetectedConfig
from can.util import check_or_adjust_timing_clock, deprecated_args_alias

logger = logging.getLogger(__name__)

Expand Down
31 changes: 15 additions & 16 deletions can/interfaces/robotell.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ def __init__(
port of underlying serial or usb device (e.g. ``/dev/ttyUSB0``, ``COM8``, ...)
Must not be empty. Can also end with ``@115200`` (or similarly) to specify the baudrate.
:param int ttyBaudrate:
baudrate of underlying serial or usb device (Ignored if set via the ``channel`` parameter)
baudrate of underlying serial or usb device
(Ignored if set via the ``channel`` parameter)
:param int bitrate:
CAN Bitrate in bit/s. Value is stored in the adapter and will be used as default if no bitrate is specified
CAN Bitrate in bit/s.
Value is stored in the adapter and will be used as default if no bitrate is specified
:param bool rtscts:
turn hardware handshake (RTS/CTS) on and off
"""
Expand Down Expand Up @@ -141,7 +143,8 @@ def set_hw_filter(self, filterid, enabled, msgid_value, msgid_mask, extended_msg
:param bool enabled:
This filter is enabled
:param int msgid_value:
CAN message ID to filter on. The test unit does not accept an extented message ID unless bit 31 of the ID was set.
CAN message ID to filter on.
The test unit does not accept an extented message ID unless bit 31 of the ID was set.
:param int msgid_mask:
Mask to apply to CAN messagge ID
:param bool extended_msg:
Expand All @@ -156,9 +159,9 @@ def set_hw_filter(self, filterid, enabled, msgid_value, msgid_mask, extended_msg
self._writeconfig(configid, msgid_value, msgid_mask)

def _getconfigsize(self, configid):
if configid == self._CAN_ART_ID or configid == self._CAN_ABOM_ID:
if configid in (self._CAN_ART_ID, self._CAN_ABOM_ID):
return 1
if configid == self._CAN_BAUD_ID or configid == self._CAN_INIT_FLASH_ID:
if configid in (self._CAN_BAUD_ID, self._CAN_INIT_FLASH_ID):
return 4
if configid == self._CAN_SERIALBPS_ID:
return 4
Expand All @@ -181,7 +184,7 @@ def _readconfig(self, configid, timeout):
newmsg = self._readmessage(not self._loopback_test, True, timeout)
if newmsg is None:
logger.warning(
f"Timeout waiting for response when reading config value {configid:04X}."
"Timeout waiting for response when reading config value %04X.", configid
)
return None
return newmsg[4:12]
Expand Down Expand Up @@ -236,7 +239,7 @@ def _readmessage(self, flushold, cfgchannel, timeout):
headpos = self._rxbuffer.find(header)
if headpos > 0:
# data does not start with expected header bytes. Log error and ignore garbage
logger.warning("Ignoring extra " + str(headpos) + " garbage bytes")
logger.warning("Ignoring extra %s garbage bytes", headpos)
del self._rxbuffer[:headpos]
headpos = self._rxbuffer.find(header) # should now be at index 0!

Expand Down Expand Up @@ -316,11 +319,7 @@ def _writemessage(self, msgid, msgdata, datalen, msgchan, msgformat, msgtype):
packet.append(self._PACKET_HEAD)
packet.append(self._PACKET_HEAD)
for msgbyte in msgbuf:
if (
msgbyte == self._PACKET_ESC
or msgbyte == self._PACKET_HEAD
or msgbyte == self._PACKET_TAIL
):
if msgbyte in (self._PACKET_ESC, self._PACKET_HEAD, self._PACKET_TAIL):
packet.append(self._PACKET_ESC)
packet.append(msgbyte)
packet.append(self._PACKET_TAIL)
Expand Down Expand Up @@ -395,9 +394,9 @@ def get_serial_number(self, timeout: int | None) -> str | None:
if sn2 is None:
return None

serial = ""
serial_number = ""
for idx in range(0, 8, 2):
serial += f"{sn1[idx]:02X}{sn1[idx + 1]:02X}-"
serial_number += f"{sn1[idx]:02X}{sn1[idx + 1]:02X}-"
for idx in range(0, 4, 2):
serial += f"{sn2[idx]:02X}{sn2[idx + 1]:02X}-"
return serial[:-1]
serial_number += f"{sn2[idx]:02X}{sn2[idx + 1]:02X}-"
return serial_number[:-1]
12 changes: 2 additions & 10 deletions can/interfaces/seeedstudio/seeedstudio.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,10 @@ def shutdown(self):
super().shutdown()
self.ser.close()

def init_frame(self, timeout=None):
def init_frame(self):
"""
Send init message to setup the device for comms. this is called during
interface creation.

:param timeout:
This parameter will be ignored. The timeout value of the channel is
used instead.
"""
byte_msg = bytearray()
byte_msg.append(0xAA) # Frame Start Byte 1
Expand All @@ -175,14 +171,10 @@ def init_frame(self, timeout=None):
def flush_buffer(self):
self.ser.flushInput()

def status_frame(self, timeout=None):
def status_frame(self):
"""
Send status request message over the serial device. The device will
respond but details of error codes are unknown but are logged - DEBUG.

:param timeout:
This parameter will be ignored. The timeout value of the channel is
used instead.
"""
byte_msg = bytearray()
byte_msg.append(0xAA) # Frame Start Byte 1
Expand Down
2 changes: 0 additions & 2 deletions can/interfaces/serial/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
""" """

__all__ = [
"SerialBus",
"serial_can",
Expand Down
19 changes: 11 additions & 8 deletions can/interfaces/slcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,21 @@ def __init__(
"""
:param str channel:
port of underlying serial or usb device (e.g. ``/dev/ttyUSB0``, ``COM8``, ...)
Must not be empty. Can also end with ``@115200`` (or similarly) to specify the baudrate.
Must not be empty. Can also end with ``@115200`` (or similarly)
to specify the baudrate.
:param int tty_baudrate:
baudrate of underlying serial or usb device (Ignored if set via the ``channel`` parameter)
baudrate of underlying serial or usb device
(Ignored if set via the ``channel`` parameter)
:param bitrate:
Bitrate in bit/s
:param timing:
Optional :class:`~can.BitTiming` instance to use for custom bit timing setting.
If this argument is set then it overrides the bitrate and btr arguments. The
`f_clock` value of the timing instance must be set to 8_000_000 (8MHz)
for standard CAN.
CAN FD and the :class:`~can.BitTimingFd` class have partial support according to the non-standard
slcan protocol implementation in the CANABLE 2.0 firmware: currently only data rates of 2M and 5M.
CAN FD and the :class:`~can.BitTimingFd` class have partial support according to
the non-standard slcan protocol implementation in the
CANABLE 2.0 firmware: currently only data rates of 2M and 5M.
:param poll_interval:
Poll interval in seconds when reading messages
:param sleep_after_open:
Expand Down Expand Up @@ -178,7 +181,7 @@ def set_bitrate(self, bitrate: int, data_bitrate: int | None = None) -> None:
if bitrate in self._BITRATES:
bitrate_code = self._BITRATES[bitrate]
else:
bitrates = ", ".join(str(k) for k in self._BITRATES.keys())
bitrates = ", ".join(str(k) for k in self._BITRATES)
raise ValueError(f"Invalid bitrate, choose one of {bitrates}.")

# If data_bitrate is None, we set it to 0 which means no data bitrate
Expand All @@ -188,7 +191,7 @@ def set_bitrate(self, bitrate: int, data_bitrate: int | None = None) -> None:
if data_bitrate in self._DATA_BITRATES:
dbitrate_code = self._DATA_BITRATES[data_bitrate]
else:
dbitrates = ", ".join(str(k) for k in self._DATA_BITRATES.keys())
dbitrates = ", ".join(str(k) for k in self._DATA_BITRATES)
raise ValueError(f"Invalid data bitrate, choose one of {dbitrates}.")

self.close()
Expand Down Expand Up @@ -216,8 +219,8 @@ def _read(self, timeout: float | None) -> str | None:

with error_check("Could not read from serial device"):
while True:
# Due to accessing `serialPortOrig.in_waiting` too often will reduce the performance.
# We read the `serialPortOrig.in_waiting` only once here.
# Due to accessing `serialPortOrig.in_waiting` too often will reduce
# the performance. We read the `serialPortOrig.in_waiting` only once here.
size = self.serialPortOrig.in_waiting or 1
self._buffer.extend(self.serialPortOrig.read(size))

Expand Down
14 changes: 12 additions & 2 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ extras =
mf4
multicast
gs-usb
serial
pywin32
serial
# still no windows-curses for py314
Expand Down Expand Up @@ -58,6 +57,8 @@ basepython = py313
dependency_groups =
lint
extras =
canalystii
gs-usb
viewer
commands =
black --check .
Expand All @@ -67,7 +68,16 @@ commands =
can/io \
doc/conf.py \
examples/**.py \
can/interfaces/socketcan
can/interfaces/canalystii \
can/interfaces/cantact \
can/interfaces/gs_usb.py \
can/interfaces/iscan.py \
can/interfaces/robotell.py \
can/interfaces/seeedstudio \
can/interfaces/serial \
can/interfaces/slcan.py \
can/interfaces/socketcan \
can/interfaces/virtual.py

[testenv:type]
description = Run type checker
Expand Down
Loading