pyxcp.transport package

Subpackages

Submodules

pyxcp.transport.base module

class pyxcp.transport.base.BaseTransport(config=None)

Bases: object

Base class for transport-layers (Can, Eth, Sxi).

Parameters:
  • config (dict-like) – Parameters like bitrate.
  • loglevel (["INFO", "WARN", "DEBUG", "ERROR", "CRITICAL"]) – Controls the verbosity of log messages.
PARAMETER_MAP = {'ALIGNMENT': (<class 'int'>, False, 1), 'CREATE_DAQ_TIMESTAMPS': (<class 'bool'>, False, False), 'LOGLEVEL': (<class 'str'>, False, 'WARN'), 'TIMEOUT': (<class 'float'>, False, 2.0)}
block_receive(length_required: int) → bytes

Implements packet reception for block communication model (e.g. for XCP on CAN)

Parameters:length_required (int) – number of bytes to be expected in block response packets
Returns:all payload bytes received in block response packets
Return type:bytes
Raises:pyxcp.types.XcpTimeoutError
block_request(cmd, *data)

Implements packet transmission for block communication model (e.g. DOWNLOAD block mode) All parameters are the same as in request(), but it does not receive response.

close()

Close the transport-layer connection and event-loop.

closeConnection()

Does the actual connection shutdown. Needs to be implemented by any sub-class.

connect()
finishListener()
listen()
loadConfig(config)

Load configuration data.

processResponse(response, length, counter, recv_timestamp=None)
process_event_packet(packet)
request(cmd, *data)
request_optional_response(cmd, *data)
send(frame)
startListener()
exception pyxcp.transport.base.Empty

Bases: Exception

pyxcp.transport.base.availableTransports()

List all subclasses of BaseTransport.

Returns:name: class
Return type:dict
pyxcp.transport.base.createTransport(name, *args, **kws)

Factory function for transports.

Returns:
Return type:BaseTransport derived instance.
pyxcp.transport.base.get(q, timeout, restart_event)

Get an item from a deque considering a timeout condition.

pyxcp.transport.can module

class pyxcp.transport.can.Can(config=None)

Bases: pyxcp.transport.base.BaseTransport

HEADER = <pyxcp.transport.can.EmptyHeader object>
HEADER_SIZE = 0
MAX_DATAGRAM_SIZE = 7
PARAMETER_MAP = {'BITRATE': (<class 'int'>, False, 250000), 'CAN_DRIVER': (<class 'str'>, True, None), 'CAN_ID_BROADCAST': (<class 'int'>, False, None), 'CAN_ID_MASTER': (<class 'int'>, True, None), 'CAN_ID_SLAVE': (<class 'int'>, True, None), 'CAN_USE_DEFAULT_LISTENER': (<class 'bool'>, False, True), 'CHANNEL': (<class 'str'>, False, ''), 'MAX_CAN_FD_DLC': (<class 'int'>, False, 64), 'MAX_DLC_REQUIRED': (<class 'bool'>, False, False), 'PADDING_VALUE': (<class 'int'>, False, 0), 'RECEIVE_OWN_MESSAGES': (<class 'bool'>, False, False)}
PARAMETER_TO_KW_ARG_MAP = {'BITRATE': 'bitrate', 'CHANNEL': 'channel', 'RECEIVE_OWN_MESSAGES': 'receive_own_messages'}
close()

Close the transport-layer connection and event-loop.

closeConnection()

Does the actual connection shutdown. Needs to be implemented by any sub-class.

connect()
dataReceived(payload: bytes, recv_timestamp: float = None)
listen()
send(frame)
class pyxcp.transport.can.CanInterfaceBase

Bases: object

Abstract CAN interface handler that can be implemented for any actual CAN device driver

PARAMETER_MAP = {}
close()

Must implement any required action for disconnecting from the can interface

connect()

Open connection to can interface

getTimestampResolution()

Get timestamp resolution in nano seconds.

init(parent, receive_callback)

Must implement any required action for initing the can interface

Parameters:
  • parent (Can) – Refers to owner.
  • receive_callback (callable) – Receive callback function to register with the following argument: payload: bytes
loadConfig(config)

Load configuration data.

read()

Read incoming data

transmit(payload: bytes)

Must transmit the given payload on the master can id.

Parameters:payload (bytes) – payload to transmit
class pyxcp.transport.can.EmptyHeader

Bases: object

There is no header for XCP on CAN

pack(*args, **kwargs)
class pyxcp.transport.can.Frame(id_: pyxcp.transport.can.Identifier, dlc: int, data: bytes, timestamp: int)

Bases: object

class pyxcp.transport.can.Identifier(raw_id: int)

Bases: object

Convenience class for XCP formatted CAN identifiers.

raw_id: int
Bit 32 set (i.e. 0x80000000) signals an extended (29-bit) identifier.
Raises:IdentifierOutOfRangeError
id
Returns:Identifier as seen on bus.
Return type:int
is_extended
Returns:
  • True - 29-bit identifier.
  • False - 11-bit identifier.
Return type:bool
static make_identifier(identifier: int, extended: bool) → int

Factory method.

Parameters:
  • identifier (int) – Identifier as seen on bus.
  • extended (bool) –
    bool
    • True - 29-bit identifier.
    • False - 11-bit identifier.
Returns:

Return type:

Identifier

Raises:

IdentifierOutOfRangeError

raw_id
Returns:Raw XCP formatted identifier.
Return type:int
exception pyxcp.transport.can.IdentifierOutOfRangeError

Bases: Exception

Signals an identifier greater then MAX_11_BIT_IDENTIFIER or MAX_29_BIT_IDENTIFIER.

pyxcp.transport.can.calculateFilter(ids: list)
Parameters:ids – An iterable (usually list or tuple) containing CAN identifiers.
Returns:Calculated filter and mask.
Return type:tuple (int, int)
pyxcp.transport.can.isExtendedIdentifier(identifier: int) → bool

Check for extendend CAN identifier.

Parameters:identifier (int) –
Returns:
Return type:bool
pyxcp.transport.can.padFrame(frame: bytes, padding_value: int, padding_len: int = 0) → bytes

Pad frame to next discrete DLC value.

ISO/DIS 15765 - 4; 8.2 Data length Code (DLC) AUTOSAR CP Release 4.3.0, Specification of CAN Transport Layer; 7.3.8 N-PDU padding AUTOSAR CP Release 4.3.0, Specification of CAN Driver; [SWS_CAN_00502], [ECUC_Can_00485] AUTOSAR CP Release 4.3.0, Requirements on CAN; [SRS_Can_01073], [SRS_Can_01086], [SRS_Can_01160]

pyxcp.transport.can.registered_drivers()
Returns:Dictionary containing CAN driver names and classes of all available drivers (pyxcp supplied and user-defined).
Return type:dict (name, class)
pyxcp.transport.can.samplePointToTsegs(tqs: int, samplePoint: float) → tuple

Calculate TSEG1 and TSEG2 from time-quantas and sample-point.

Parameters:
  • tqs (int) – Number of time-quantas
  • samplePoint (float or int) – Sample-point as a percentage value.
Returns:

Return type:

tuple (TSEG1, TSEG2)

pyxcp.transport.can.setDLC(length: int)

Return DLC value according to CAN-FD.

Parameters:length – Length value to be mapped to a valid CAN-FD DLC. ( 0 <= length <= 64)
pyxcp.transport.can.stripIdentifier(identifier: int) → int

Get raw CAN identifier (remove CAN_EXTENDED_ID bit if present).

Parameters:identifier (int) –
Returns:
Return type:int
pyxcp.transport.can.try_to_install_system_supplied_drivers()

Register available pyxcp CAN drivers.

pyxcp.transport.eth module

class pyxcp.transport.eth.Eth(config=None)

Bases: pyxcp.transport.base.BaseTransport

HEADER = <Struct object>
HEADER_SIZE = 4
MAX_DATAGRAM_SIZE = 512
PARAMETER_MAP = {'HOST': (<class 'str'>, False, 'localhost'), 'IPV6': (<class 'bool'>, False, False), 'PORT': (<class 'int'>, False, 5555), 'PROTOCOL': (<class 'str'>, False, 'TCP'), 'TCP_NODELAY': (<class 'bool'>, False, False)}
close()

Close the transport-layer connection and event-loop.

closeConnection()

Does the actual connection shutdown. Needs to be implemented by any sub-class.

connect()
invalidSocket
listen()
send(frame)
startListener()

pyxcp.transport.sxi module

class pyxcp.transport.sxi.SxI(config=None)

Bases: pyxcp.transport.base.BaseTransport

HEADER = <Struct object>
HEADER_SIZE = 4
MAX_DATAGRAM_SIZE = 512
PARAMETER_MAP = {'BITRATE': (<class 'int'>, False, 38400), 'BYTESIZE': (<class 'int'>, False, 8), 'PARITY': (<class 'str'>, False, 'N'), 'PORT': (<class 'str'>, False, 'COM1'), 'STOPBITS': (<class 'int'>, False, 1)}
TIMEOUT = 0.75
closeConnection()

Does the actual connection shutdown. Needs to be implemented by any sub-class.

connect()
flush()
listen()
output(enable)
send(frame)

Module contents