跳到主要内容

CoE (CANopen over EtherCAT)

通过 slave.coe 访问。从站不支持 CoE 时为 None

slave.coeCoE 类的实例,提供 SDO 读写功能。

快速开始

slave = master[1]

# 读取 SDO 原始字节
data = slave.coe.sdo_read(0x1018, 1) # Vendor ID

# 便捷数值读取
status = slave.coe.sdo_read_value(0x6041, 0, dtype='u16')
position = slave.coe.sdo_read_value(0x6064, 0, dtype='i32')

# 便捷数值写入
slave.coe.sdo_write_value(0x6040, 0, 0x0006, dtype='u16')
slave.coe.sdo_write_value(0x607A, 0, 100000, dtype='i32')

# 原始字节写入
import struct
slave.coe.sdo_write(0x6040, 0, struct.pack('<H', 0x0006))

SDO 读取

sdo_read()

def sdo_read(self, index: int, subindex: int = 0,
complete_access: bool = False) -> Optional[bytes]

读取 SDO 对象,返回原始字节数据。

参数:

  • index (int) — 对象字典索引 (0x0000-0xFFFF)
  • subindex (int) — 子索引
  • complete_access (bool) — 是否完整访问

返回值:

  • bytes | None — 读取到的字节数据,失败返回 None

示例:

data = slave.coe.sdo_read(0x1000, 0)
if data is not None:
import struct
device_type = struct.unpack('<I', data[:4])[0]
print(f"Device Type: 0x{device_type:08X}")

sdo_read_value()

def sdo_read_value(self, index: int, subindex: int = 0,
dtype: str = 'u32') -> Optional[int]

读取 SDO 并解析为数值。

参数:

  • index (int) — 对象字典索引
  • subindex (int) — 子索引
  • dtype (str) — 数据类型: 'u8', 'u16', 'u32', 'u64', 'i8', 'i16', 'i32', 'i64'

返回值:

  • int | None — 解析后的数值,失败返回 None

示例:

# 读取状态字 (StatusWord, 0x6041:00)
status = slave.coe.sdo_read_value(0x6041, 0, dtype='u16')
if status is not None:
print(f"StatusWord = 0x{status:04X}")

# 读取实际位置 (0x6064:00)
position = slave.coe.sdo_read_value(0x6064, 0, dtype='i32')
print(f"实际位置: {position}")

SDO 写入

sdo_write()

def sdo_write(self, index: int, subindex: int, data: bytes,
complete_access: bool = False) -> bool

写入 SDO 对象。

参数:

  • index (int) — 对象字典索引
  • subindex (int) — 子索引
  • data (bytes) — 要写入的字节数据
  • complete_access (bool) — 是否完整访问

返回值:

  • bool — 写入是否成功

示例:

import struct

# 写入控制字
slave.coe.sdo_write(0x6040, 0, struct.pack('<H', 0x0006))

# 写入目标位置
slave.coe.sdo_write(0x607A, 0, struct.pack('<i', 100000))

sdo_write_value()

def sdo_write_value(self, index: int, subindex: int, value: int,
dtype: str = 'u32') -> bool

写入数值类型的 SDO。

参数:

  • index (int) — 对象字典索引
  • subindex (int) — 子索引
  • value (int) — 要写入的数值
  • dtype (str) — 数据类型: 'u8', 'u16', 'u32', 'u64', 'i8', 'i16', 'i32', 'i64'

返回值:

  • bool — 写入是否成功

示例:

# 写入控制字
slave.coe.sdo_write_value(0x6040, 0, 0x0006, dtype='u16')

# 写入目标位置
slave.coe.sdo_write_value(0x607A, 0, 100000, dtype='i32')

# 设置操作模式
slave.coe.sdo_write_value(0x6060, 0, 8, dtype='i8') # CSP 模式

批量异步读取

read_multiple_async()

async def read_multiple_async(self, entries: list,
cancel_event=None) -> dict

批量 SDO 读取 — 一次调用读取多个对象。内部通过 asyncio.run_in_executor 在线程池中顺序执行,避免逐个 await 造成的上下文切换开销。

参数:

  • entries (list) — [(index, subindex), ...] 待读取的对象列表
  • cancel_event (asyncio.Event, 可选) — set 后中止剩余读取,抛出 CancelledError

返回值:

  • dict{(index, subindex): bytes 或 None},读取失败的条目值为 None

示例:

import asyncio

async def read_servo_status():
results = await slave.coe.read_multiple_async([
(0x6041, 0), # StatusWord
(0x6064, 0), # 实际位置
(0x6077, 0), # 实际力矩
])
for (idx, sub), data in results.items():
if data is not None:
print(f"0x{idx:04X}:{sub} = {data.hex()}")

asyncio.run(read_servo_status())

带取消的示例:

import asyncio

cancel = asyncio.Event()

async def read_with_cancel():
try:
results = await slave.coe.read_multiple_async(
[(0x6041, 0), (0x6064, 0)],
cancel_event=cancel
)
except asyncio.CancelledError:
print("读取已取消")

属性

属性类型访问说明
last_sdo_errorSDOError只读最后一次 SDO 操作的错误码

last_sdo_error

@property
def last_sdo_error(self) -> SDOError

最后一次 SDO 操作的错误码。当 sdo_read() / sdo_write() 失败时,可通过此属性获取 SDO 中止码 (Abort Code)。成功操作后自动重置为 SDOError.NO_ERROR

示例:

data = slave.coe.sdo_read(0x6041, 0)
if data is None:
error = slave.coe.last_sdo_error
print(f"SDO 读取失败: {error.name} (0x{error.value:08X})")
print(f"描述: {error.describe()}")

EMCY 紧急消息

CoE 支持 CiA 301 紧急消息 (Emergency) 记录。每个从站自动收集该从站的 EMCY 消息。

Python SDK 通过两种方式访问 EMCY 历史:

方式一: 通过从站属性

# 获取 EMCY 消息数量
count = slave.emcy_count

# 获取 EMCY 历史记录列表
history = slave.emcy_history
# 返回 [{'error_code': int, 'error_register': int, 'data': bytes, 'timestamp_ms': int}, ...]

for msg in history:
print(f"EMCY 0x{msg['error_code']:04X} 寄存器=0x{msg['error_register']:02X}")

# 清除 EMCY 历史
slave.clear_emcy()

方式二: 通过 CoEEmcyRecorder

CoEEmcyRecorder 提供更丰富的 EMCY 历史记录管理,包含 EmergencyMessage 结构体和错误描述。

from ethercat.slave import EmergencyMessage, CoEEmcyRecorder

# CoEEmcyRecorder 通常由事件系统自动管理
# 手动创建示例:
recorder = CoEEmcyRecorder(slave_index=1)

EmergencyMessage 结构

class EmergencyMessage:
error_code: int # 紧急错误代码 (CiA 301 Table 24)
error_register: int # 错误寄存器 (对象 0x1001)
data: bytes # 厂商特定数据 (5 字节)
slave_index: int # 来源从站编号
timestamp: datetime # 接收时间

def get_error_description(self) -> str
# 根据 CiA 301 标准错误代码高字节返回分类描述

get_error_description() 返回中文描述,如 "电流错误"、"温度错误"、"通信错误" 等。

CoEEmcyRecorder API

# 获取 EMCY 历史记录 (返回副本)
history = recorder.get_emergency_history() # -> List[EmergencyMessage]

# 清除历史记录
recorder.clear_emergency_history()

# 当前记录数量
count = recorder.count

# 最大历史容量 (默认 256,类常量)
max_size = CoEEmcyRecorder.MAX_HISTORY_SIZE

示例:

# 读取 EMCY 历史
history = recorder.get_emergency_history()
for msg in history:
print(f"EMCY 0x{msg.error_code:04X}: {msg.get_error_description()} "
f"寄存器=0x{msg.error_register:02X} "
f"时间={msg.timestamp:%H:%M:%S.%f}")

# 清除历史
recorder.clear_emergency_history()

高级功能: 对象字典树 (ODList)

ODList 类提供完整的对象字典树遍历功能,自动加载从站 OD 结构。

ODList 为高级 API

ODList(master._dll, master.master_index, ...) 属于内部高级 API, 仅在需要完整 OD 树遍历 (诊断工具/HMI 浏览) 时使用。常规 SDO 读写通过 slave.coe.sdo_read_value() / sdo_write_value() 即可, 无需加载完整 OD。 高级 API 的句柄参数后续可能调整, 请勿在生产代码中长期固化该构造形式。

from ethercat import ODList

# 加载从站对象字典
od = ODList(master._dll, master.master_index, 1, slave.coe)

if od.loaded:
# 遍历所有索引
for obj in od:
print(f"0x{obj.index:04X}: {obj.name} (subs={len(obj)})")

# 遍历子索引
for entry in obj:
print(f" [{entry.subindex}] {entry.name}: {entry.data_type.name}")

# 按索引访问
if 0x6041 in od:
entry = od[0x6041][0]
value = entry.read_u16()
print(f"StatusWord = 0x{value:04X}")

copy()

def copy(self) -> 'ODList'

创建对象字典的缓存副本(结构 + 值的快照)。副本中的值为快照,不再实时读取。

示例:

# 创建 ODList 快照
od_snapshot = od.copy()

# 创建单个 ObjectDictionary 的快照
if 0x6040 in od:
od_copy = od[0x6040].copy()

ObjectDictionary 属性

  • index (int) — 对象索引号
  • name (str) — 对象名称
  • data_type (int) — 数据类型标识
  • object_code (int) — 对象代码(变量/数组/记录)
  • len(od) (int) — 子对象数量

ObjectDictionary 方法

# 按子索引获取 ObjectEntry
entry = od[0x6040][0]

# 按名称获取
entry = od[0x1018]["Vendor ID"]

# 批量读取所有子索引
data_dict = od[0x1018].read_all() # -> Dict[int, bytes]

# SDO 写入整个对象
od[0x6040].sdo_write(data, use_complete_access=False)
od[0x6040].sdo_write(subindex, data, use_complete_access=False)

ObjectEntry 类型化读写

通过 ODList 获取的 ObjectEntry 对象支持类型化读写方法:

  • read_u8() / write_u8(v) — 无符号 8 位
  • read_u16() / write_u16(v) — 无符号 16 位
  • read_u32() / write_u32(v) — 无符号 32 位
  • read_u64() / write_u64(v) — 无符号 64 位
  • read_i8() / write_i8(v) — 有符号 8 位
  • read_i16() / write_i16(v) — 有符号 16 位
  • read_i32() / write_i32(v) — 有符号 32 位
  • read_i64() / write_i64(v) — 有符号 64 位
  • read_f32() / write_f32(v) — 32 位浮点
  • read_f64() / write_f64(v) — 64 位浮点
  • read_string() / write_string(v) — 字符串
  • read_typed() — 根据数据类型自动解析

ObjectEntry 属性

  • subindex (int) — 子索引
  • name (str) — 名称
  • data_type (EcDataType) — 数据类型
  • bit_length (int) — 位长度
  • obj_access (ObjAccess) — 访问权限标志
  • bytes (bytes) — 原始字节读写
  • can_read (bool) — 是否可读
  • can_write (bool) — 是否可写
  • is_read_only (bool) — 是否只读
  • access_description (str) — 权限描述文字(中文)

示例:

entry = od[0x6040][0]
if entry.can_write:
entry.write_u16(0x0006)

print(f"访问权限: {entry.access_description}")

SDOError 枚举

SDO 操作错误代码(CANopen 标准 / ETG.1020)。SDOError 提供 describe() 方法获取中文描述。

from ethercat.data.types import SDOError
名称说明
NO_ERROR0x00000000无错误
TOGGLE_BIT_NOT_CHANGED0x05030000Toggle 位未改变
SDO_PROTOCOL_TIMEOUT0x05040000SDO 协议超时
INVALID_COMMAND_SPECIFIER0x05040001无效的命令标识符
OUT_OF_MEMORY0x05040005内存不足
UNSUPPORTED_ACCESS0x06010000不支持的访问
READ_WRITE_ONLY_ERROR0x06010001尝试读取只写对象
WRITE_READ_ONLY_ERROR0x06010002尝试写入只读对象
SUBINDEX_WRITE_ERROR0x06010003子索引不允许写入
COMPLETE_ACCESS_NOT_SUPPORTED0x06010004不支持完全访问
OBJECT_LENGTH_EXCEEDED0x06010005对象长度超限
OBJECT_MAPPED_TO_RXPDO0x06010006对象已映射到 RxPDO
OBJECT_DOES_NOT_EXIST0x06020000对象不存在
CANNOT_BE_MAPPED_TO_PDO0x06040041不可映射到 PDO
EXCEEDS_PDO_LENGTH0x06040042超出 PDO 长度
PARAMETER_INCOMPATIBILITY0x06040043参数不兼容
INTERNAL_INCOMPATIBILITY0x06040047内部不兼容
HARDWARE_ACCESS_ERROR0x06060000硬件访问错误
DATA_TYPE_MISMATCH0x06070010数据类型不匹配
DATA_TYPE_TOO_HIGH0x06070012数据类型长度过大
DATA_TYPE_TOO_LOW0x06070013数据类型长度过小
SUBINDEX_DOES_NOT_EXIST0x06090011子索引不存在
VALUE_RANGE_EXCEEDED0x06090030值超出范围
VALUE_TOO_HIGH0x06090031值过大
VALUE_TOO_LOW0x06090032值过小
MODULE_IDENT_LIST_MISMATCH0x06090033模块列表不匹配
MAX_LESS_THAN_MIN0x06090036最大值小于最小值
GENERAL_ERROR0x08000000一般错误
DATA_TRANSFER_ERROR0x08000020数据传输错误
LOCAL_CONTROL_ERROR0x08000021本地控制错误
DEVICE_STATE_ERROR0x08000022设备状态错误
DICTIONARY_ERROR0x08000023字典错误
UNKNOWN_ERROR0xFFFFFFFF未知错误

EcDataType 枚举

EtherCAT 数据类型枚举(基于 ETG.1000.6),决定 ObjectEntry 类型化读写的自动转换目标。

from ethercat.slave.coe import EcDataType
枚举值Python 类型说明
BOOLEAN0x0001bool布尔
INTEGER80x0002intint8
INTEGER160x0003intint16
INTEGER320x0004intint32
UNSIGNED80x0005intuint8
UNSIGNED160x0006intuint16
UNSIGNED320x0007intuint32
REAL320x0008float单精度浮点
VISIBLE_STRING0x0009strASCII 字符串
OCTET_STRING0x000Abytes字节串
UNICODE_STRING0x000BstrUnicode 字符串
REAL640x0011float双精度浮点
INTEGER640x0015intint64
UNSIGNED640x001Bintuint64

EcDataType 辅助属性

# 获取类型对应的字节大小
size = EcDataType.UNSIGNED16.byte_size # 2

# 获取 struct 格式字符串
fmt = EcDataType.INTEGER32.struct_fmt # '<i'

ObjAccess 访问权限标志

from ethercat.slave.coe import ObjAccess

# 检查权限
if entry.obj_access & ObjAccess.READABLE:
print("可读")
if entry.obj_access & ObjAccess.WRITE_OP:
print("OP 状态可写")

# 获取中文描述
print(entry.obj_access.access_description) # "读写(PreOP,SafeOP,OP可写)"
标志说明
NONE0x0000无权限
READ_PRE0x0001PreOp 可读
READ_SAFE0x0002SafeOp 可读
READ_OP0x0004OP 可读
WRITE_PRE0x0008PreOp 可写
WRITE_SAFE0x0010SafeOp 可写
WRITE_OP0x0020OP 可写
READABLE0x0007所有状态可读
WRITABLE0x0038所有状态可写

完整示例

SDO 读写

from ethercat import EtherCATMaster, EcState

with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()

slave = master[1]

# 读取设备信息
vendor_id = slave.coe.sdo_read_value(0x1018, 1, dtype='u32')
product_code = slave.coe.sdo_read_value(0x1018, 2, dtype='u32')
print(f"VendorID=0x{vendor_id:08X}, ProductCode=0x{product_code:08X}")

# 读取状态字
status = slave.coe.sdo_read_value(0x6041, 0, dtype='u16')
print(f"StatusWord = 0x{status:04X}")

# 写入控制字
slave.coe.sdo_write_value(0x6040, 0, 0x0006, dtype='u16')

# 设置操作模式
slave.coe.sdo_write_value(0x6060, 0, 8, dtype='i8')

# 错误处理
data = slave.coe.sdo_read(0x9999, 0)
if data is None:
error = slave.coe.last_sdo_error
print(f"SDO 错误: {error.name} - {error.describe()}")

master.stop()

对象字典扫描

from ethercat import ODList

od = ODList(master._dll, master.master_index, 1, slave.coe)
if od.loaded:
for obj in od:
print(f"0x{obj.index:04X}: {obj.name}")
for entry in obj:
value = entry.read_typed()
print(f" [{entry.subindex}] {entry.name} = {value}")

# 创建快照副本
snapshot = od.copy()

EMCY 历史记录

# 方式一: 通过从站属性
count = slave.emcy_count
history = slave.emcy_history
for msg in history:
print(f"EMCY 0x{msg['error_code']:04X}")
slave.clear_emcy()

# 方式二: 通过 CoEEmcyRecorder
from ethercat.slave import CoEEmcyRecorder
recorder = CoEEmcyRecorder(slave_index=1)
for msg in recorder.get_emergency_history():
print(f"EMCY 0x{msg.error_code:04X}: {msg.get_error_description()}")
recorder.clear_emergency_history()

CoE 诊断历史 (0x10F3)

ETG.1020 §16 定义了 CoE 诊断历史对象 0x10F3, 从站通过环形缓冲区保存 运行期诊断消息 (Octet-Diagnosis). SDK 提供轻量级的运行时监听接口, 不需要一次性拉取全部历史记录, 适用于 PLC 循环中快速轮询新事件。

read_diagnostic_messages() 的区别
  • read_diagnostic_messages() — 一次性把整个 0x10F3 缓冲区拉回主站, 开销较大
  • poll_has_new_diagnostic() + read_diagnostic_message(idx) — 只拉发生变化的单条消息, 适合运行时周期轮询

poll_has_new_diagnostic()

def poll_has_new_diagnostic(self) -> int

快速轮询 0x10F3:04 NewMessagesAvailable 标志, 判断是否有尚未读取的诊断消息。

返回值:

  • 1 — 有新消息 (建议紧跟 read_diagnostic_meta() + read_diagnostic_message())
  • 0 — 无新消息
  • -1 — 通信失败 (可回退到 read_diagnostic_messages())

示例:

rc = slave.coe.poll_has_new_diagnostic()
if rc == 1:
print("有新诊断消息")

read_diagnostic_meta()

def read_diagnostic_meta(self) -> Optional[DiagMeta]

读取 0x10F3:01-05 元数据块, 返回 DiagMeta 数据类, 失败返回 None

相关结构:

@dataclass
class DiagMeta:
max_messages: int = 0 # 0x10F3:01, 从站诊断缓冲区容量 (通常 <=250)
newest_message: int = 0 # 0x10F3:02, 最新消息在 0x10F3 中的 subindex
newest_acknowledged: int = 0 # 0x10F3:03, 最后确认的 subindex
flags: int = 0 # 0x10F3:05, bit4=Ring/Linear, bit5=Overrun
判断缓冲区类型 / 溢出

flags 位定义:

  • bit4 (0x10) — 1 = Ring Buffer, 0 = Linear
  • bit5 (0x20) — 1 = 发生过 Overrun (有消息被覆盖)

read_diagnostic_message()

def read_diagnostic_message(self, msg_subidx: int,
buf_size: int = 1024) -> Optional[bytes]

读取 0x10F3:msg_subidx 指定一条 Octet-Diagnosis 原始字节。

参数:

  • msg_subidx (int) — subindex 范围 [6, 255] (ETG.1020)
  • buf_size (int) — 接收缓冲字节数, 默认 1024

返回值:

  • bytes | None — 原始字节 (实际长度); 失败返回 None

acknowledge_diagnostic()

def acknowledge_diagnostic(self, msg_subidx: int) -> bool

0x10F3:03 = msg_subidx 确认已处理该消息 (Ring Buffer 推进)。 应用层处理完一条消息后应及时确认, 否则 newest_acknowledged 不推进, 可能导致 Overrun。

示例: 周期性轮询监听

while running:
if slave.coe.poll_has_new_diagnostic() == 1:
meta = slave.coe.read_diagnostic_meta()
if meta is None:
continue

# 检查 Ring Buffer / Overrun
is_ring = bool(meta.flags & 0x10)
has_overrun = bool(meta.flags & 0x20)
if has_overrun:
print(f"警告: 诊断缓冲区溢出, 部分消息丢失")

# 从 newest_acknowledged+1 读到 newest_message
start = (meta.newest_acknowledged + 1) if meta.newest_acknowledged >= 6 else 6
end = meta.newest_message
if start <= end:
for idx in range(start, end + 1):
raw = slave.coe.read_diagnostic_message(idx)
if raw:
print(f"[0x10F3:{idx:02X}] {raw.hex()}")
slave.coe.acknowledge_diagnostic(idx)

time.sleep(0.1) # 100ms 轮询
0x10F3 与 EMCY 的关系

0x10F3 诊断历史与 CiA 301 EMCY 是两套独立机制:

  • EMCY 通过邮箱 主动推送 致命错误 (从站异步发送)
  • 0x10F3 由主站 主动轮询 读取历史记录 (从站被动响应)

两者可以共存, 对应不同诊断层级。