CoE (CANopen over EtherCAT)
通过 slave.coe 访问。从站不支持 CoE 时为 None。
slave.coe 是 CoE 类的实例,提供 SDO 读写功能。
快速开始
slave = master[1]
# 读取 SDO 原始字节
data = slave.coe.sdo_read(0x1018, 1) # Vendor ID
# 便捷数值读取
status = slave.coe.sdo_read_value(0x6041, 0, dtype='u16')
position = slave.coe.sdo_read_value(0x6064, 0, dtype='i32')
# 便捷数值写入
slave.coe.sdo_write_value(0x6040, 0, 0x0006, dtype='u16')
slave.coe.sdo_write_value(0x607A, 0, 100000, dtype='i32')
# 原始字节写入
import struct
slave.coe.sdo_write(0x6040, 0, struct.pack('<H', 0x0006))
SDO 读取
sdo_read()
def sdo_read(self, index: int, subindex: int = 0,
complete_access: bool = False) -> Optional[bytes]
读取 SDO 对象,返回原始字节数据。
参数:
index(int) — 对象字典索引 (0x0000-0xFFFF)subindex(int) — 子索引complete_access(bool) — 是否完整访问
返回值:
bytes | None— 读取到的字节数据,失败返回None
示例:
data = slave.coe.sdo_read(0x1000, 0)
if data is not None:
import struct
device_type = struct.unpack('<I', data[:4])[0]
print(f"Device Type: 0x{device_type:08X}")
sdo_read_value()
def sdo_read_value(self, index: int, subindex: int = 0,
dtype: str = 'u32') -> Optional[int]
读取 SDO 并解析为数值。
参数:
index(int) — 对象字典索引subindex(int) — 子索引dtype(str) — 数据类型:'u8','u16','u32','u64','i8','i16','i32','i64'
返回值:
int | None— 解析后的数值,失败返回None
示例:
# 读取状态字 (StatusWord, 0x6041:00)
status = slave.coe.sdo_read_value(0x6041, 0, dtype='u16')
if status is not None:
print(f"StatusWord = 0x{status:04X}")
# 读取实际位置 (0x6064:00)
position = slave.coe.sdo_read_value(0x6064, 0, dtype='i32')
print(f"实际位置: {position}")
SDO 写入
sdo_write()
def sdo_write(self, index: int, subindex: int, data: bytes,
complete_access: bool = False) -> bool
写入 SDO 对象。
参数:
index(int) — 对象字典索引subindex(int) — 子索引data(bytes) — 要写入的字节数据complete_access(bool) — 是否完整访问
返回值:
bool— 写入是否成功
示例:
import struct
# 写入控制字
slave.coe.sdo_write(0x6040, 0, struct.pack('<H', 0x0006))
# 写入目标位置
slave.coe.sdo_write(0x607A, 0, struct.pack('<i', 100000))
sdo_write_value()
def sdo_write_value(self, index: int, subindex: int, value: int,
dtype: str = 'u32') -> bool
写入数值类型的 SDO。
参数:
index(int) — 对象字典索引subindex(int) — 子索引value(int) — 要写入的数值dtype(str) — 数据类型:'u8','u16','u32','u64','i8','i16','i32','i64'
返回值:
bool— 写入是否成功
示例:
# 写入控制字
slave.coe.sdo_write_value(0x6040, 0, 0x0006, dtype='u16')
# 写入目标位置
slave.coe.sdo_write_value(0x607A, 0, 100000, dtype='i32')
# 设置操作模式
slave.coe.sdo_write_value(0x6060, 0, 8, dtype='i8') # CSP 模式
批量异步读取
read_multiple_async()
async def read_multiple_async(self, entries: list,
cancel_event=None) -> dict
批量 SDO 读取 — 一次调用读取多个对象。内部通过 asyncio.run_in_executor 在线程池中顺序执行,避免逐个 await 造成的上下文切换开销。
参数:
entries(list) —[(index, subindex), ...]待读取的对象列表cancel_event(asyncio.Event, 可选) — set 后中止剩余读取,抛出CancelledError
返回值:
dict—{(index, subindex): bytes 或 None},读取失败的条目值为None
示例:
import asyncio
async def read_servo_status():
results = await slave.coe.read_multiple_async([
(0x6041, 0), # StatusWord
(0x6064, 0), # 实际位置
(0x6077, 0), # 实际力矩
])
for (idx, sub), data in results.items():
if data is not None:
print(f"0x{idx:04X}:{sub} = {data.hex()}")
asyncio.run(read_servo_status())
带取消的示例:
import asyncio
cancel = asyncio.Event()
async def read_with_cancel():
try:
results = await slave.coe.read_multiple_async(
[(0x6041, 0), (0x6064, 0)],
cancel_event=cancel
)
except asyncio.CancelledError:
print("读取已取消")
属性
| 属性 | 类型 | 访问 | 说明 |
|---|---|---|---|
| last_sdo_error | SDOError | 只读 | 最后一次 SDO 操作的错误码 |
last_sdo_error
@property
def last_sdo_error(self) -> SDOError
最后一次 SDO 操作的错误码。当 sdo_read() / sdo_write() 失败时,可通过此属性获取 SDO 中止码 (Abort Code)。成功操作后自动重置为 SDOError.NO_ERROR。
示例:
data = slave.coe.sdo_read(0x6041, 0)
if data is None:
error = slave.coe.last_sdo_error
print(f"SDO 读取失败: {error.name} (0x{error.value:08X})")
print(f"描述: {error.describe()}")
EMCY 紧急消息
CoE 支持 CiA 301 紧急消息 (Emergency) 记录。每个从站自动收集该从站的 EMCY 消息。
Python SDK 通过两种方式访问 EMCY 历史:
方式一: 通过从站属性
# 获取 EMCY 消息数量
count = slave.emcy_count
# 获取 EMCY 历史记录列表
history = slave.emcy_history
# 返回 [{'error_code': int, 'error_register': int, 'data': bytes, 'timestamp_ms': int}, ...]
for msg in history:
print(f"EMCY 0x{msg['error_code']:04X} 寄存器=0x{msg['error_register']:02X}")
# 清除 EMCY 历史
slave.clear_emcy()
方式二: 通过 CoEEmcyRecorder
CoEEmcyRecorder 提供更丰富的 EMCY 历史记录管理,包含 EmergencyMessage 结构体和错误描述。
from ethercat.slave import EmergencyMessage, CoEEmcyRecorder
# CoEEmcyRecorder 通常由事件系统自动管理
# 手动创建示例:
recorder = CoEEmcyRecorder(slave_index=1)
EmergencyMessage 结构
class EmergencyMessage:
error_code: int # 紧急错误代码 (CiA 301 Table 24)
error_register: int # 错误寄存器 (对象 0x1001)
data: bytes # 厂商特定数据 (5 字节)
slave_index: int # 来源从站编号
timestamp: datetime # 接收时间
def get_error_description(self) -> str
# 根据 CiA 301 标准错误代码高字节返回分类描述
get_error_description() 返回中文描述,如 "电流错误"、"温度错误"、"通信错误" 等。
CoEEmcyRecorder API
# 获取 EMCY 历史记录 (返回副本)
history = recorder.get_emergency_history() # -> List[EmergencyMessage]
# 清除历史记录
recorder.clear_emergency_history()
# 当前记录数量
count = recorder.count
# 最大历史容量 (默认 256,类常量)
max_size = CoEEmcyRecorder.MAX_HISTORY_SIZE
示例:
# 读取 EMCY 历史
history = recorder.get_emergency_history()
for msg in history:
print(f"EMCY 0x{msg.error_code:04X}: {msg.get_error_description()} "
f"寄存器=0x{msg.error_register:02X} "
f"时间={msg.timestamp:%H:%M:%S.%f}")
# 清除历史
recorder.clear_emergency_history()
高级功能: 对象字典树 (ODList)
ODList 类提供完整的对象字典树遍历功能,自动加载从站 OD 结构。
ODList(master._dll, master.master_index, ...) 属于内部高级 API, 仅在需要完整 OD 树遍历 (诊断工具/HMI 浏览) 时使用。常规 SDO 读写通过 slave.coe.sdo_read_value() / sdo_write_value() 即可, 无需加载完整 OD。
高级 API 的句柄参数后续可能调整, 请勿在生产代码中长期固化该构造形式。
from ethercat import ODList
# 加载从站对象字典
od = ODList(master._dll, master.master_index, 1, slave.coe)
if od.loaded:
# 遍历所有索引
for obj in od:
print(f"0x{obj.index:04X}: {obj.name} (subs={len(obj)})")
# 遍历子索引
for entry in obj:
print(f" [{entry.subindex}] {entry.name}: {entry.data_type.name}")
# 按索引访问
if 0x6041 in od:
entry = od[0x6041][0]
value = entry.read_u16()
print(f"StatusWord = 0x{value:04X}")
copy()
def copy(self) -> 'ODList'
创建对象字典的缓存副本(结构 + 值的快照)。副本中的值为快照,不再实时读取。
示例:
# 创建 ODList 快照
od_snapshot = od.copy()
# 创建单个 ObjectDictionary 的快照
if 0x6040 in od:
od_copy = od[0x6040].copy()
ObjectDictionary 属性
index(int) — 对象索引号name(str) — 对象名称data_type(int) — 数据类型标识object_code(int) — 对象代码(变量/数组/记录)len(od)(int) — 子对象数量
ObjectDictionary 方法
# 按子索引获取 ObjectEntry
entry = od[0x6040][0]
# 按名称获取
entry = od[0x1018]["Vendor ID"]
# 批量读取所有子索引
data_dict = od[0x1018].read_all() # -> Dict[int, bytes]
# SDO 写入整个对象
od[0x6040].sdo_write(data, use_complete_access=False)
od[0x6040].sdo_write(subindex, data, use_complete_access=False)
ObjectEntry 类型化读写
通过 ODList 获取的 ObjectEntry 对象支持类型化读写方法:
read_u8()/write_u8(v)— 无符号 8 位read_u16()/write_u16(v)— 无符号 16 位read_u32()/write_u32(v)— 无符号 32 位read_u64()/write_u64(v)— 无符号 64 位read_i8()/write_i8(v)— 有符号 8 位read_i16()/write_i16(v)— 有符号 16 位read_i32()/write_i32(v)— 有符号 32 位read_i64()/write_i64(v)— 有符号 64 位read_f32()/write_f32(v)— 32 位浮点read_f64()/write_f64(v)— 64 位浮点read_string()/write_string(v)— 字符串read_typed()— 根据数据类型自动解析
ObjectEntry 属性
subindex(int) — 子索引name(str) — 名称data_type(EcDataType) — 数据类型bit_length(int) — 位长度obj_access(ObjAccess) — 访问权限标志bytes(bytes) — 原始字节读写can_read(bool) — 是否可读can_write(bool) — 是否可写is_read_only(bool) — 是否只读access_description(str) — 权限描述文字(中文)
示例:
entry = od[0x6040][0]
if entry.can_write:
entry.write_u16(0x0006)
print(f"访问权限: {entry.access_description}")
SDOError 枚举
SDO 操作错误代码(CANopen 标准 / ETG.1020)。SDOError 提供 describe() 方法获取中文描述。
from ethercat.data.types import SDOError
| 名称 | 值 | 说明 |
|---|---|---|
| NO_ERROR | 0x00000000 | 无错误 |
| TOGGLE_BIT_NOT_CHANGED | 0x05030000 | Toggle 位未改变 |
| SDO_PROTOCOL_TIMEOUT | 0x05040000 | SDO 协议超时 |
| INVALID_COMMAND_SPECIFIER | 0x05040001 | 无效的命令标识符 |
| OUT_OF_MEMORY | 0x05040005 | 内存不足 |
| UNSUPPORTED_ACCESS | 0x06010000 | 不支持的访问 |
| READ_WRITE_ONLY_ERROR | 0x06010001 | 尝试读取只写对象 |
| WRITE_READ_ONLY_ERROR | 0x06010002 | 尝试写入只读对象 |
| SUBINDEX_WRITE_ERROR | 0x06010003 | 子索引不允许写入 |
| COMPLETE_ACCESS_NOT_SUPPORTED | 0x06010004 | 不支持完全访问 |
| OBJECT_LENGTH_EXCEEDED | 0x06010005 | 对象长度超限 |
| OBJECT_MAPPED_TO_RXPDO | 0x06010006 | 对象已映射到 RxPDO |
| OBJECT_DOES_NOT_EXIST | 0x06020000 | 对象不存在 |
| CANNOT_BE_MAPPED_TO_PDO | 0x06040041 | 不可映射到 PDO |
| EXCEEDS_PDO_LENGTH | 0x06040042 | 超出 PDO 长度 |
| PARAMETER_INCOMPATIBILITY | 0x06040043 | 参数不兼容 |
| INTERNAL_INCOMPATIBILITY | 0x06040047 | 内部不兼容 |
| HARDWARE_ACCESS_ERROR | 0x06060000 | 硬件访问错误 |
| DATA_TYPE_MISMATCH | 0x06070010 | 数据类型不匹配 |
| DATA_TYPE_TOO_HIGH | 0x06070012 | 数据类型长度过大 |
| DATA_TYPE_TOO_LOW | 0x06070013 | 数据类型长度过小 |
| SUBINDEX_DOES_NOT_EXIST | 0x06090011 | 子索引不存在 |
| VALUE_RANGE_EXCEEDED | 0x06090030 | 值超出范围 |
| VALUE_TOO_HIGH | 0x06090031 | 值过大 |
| VALUE_TOO_LOW | 0x06090032 | 值过小 |
| MODULE_IDENT_LIST_MISMATCH | 0x06090033 | 模块列表不匹配 |
| MAX_LESS_THAN_MIN | 0x06090036 | 最大值小于最小值 |
| GENERAL_ERROR | 0x08000000 | 一般错误 |
| DATA_TRANSFER_ERROR | 0x08000020 | 数据传输错误 |
| LOCAL_CONTROL_ERROR | 0x08000021 | 本地控制错误 |
| DEVICE_STATE_ERROR | 0x08000022 | 设备状态错误 |
| DICTIONARY_ERROR | 0x08000023 | 字典错误 |
| UNKNOWN_ERROR | 0xFFFFFFFF | 未知错误 |
EcDataType 枚举
EtherCAT 数据类型枚举(基于 ETG.1000.6),决定 ObjectEntry 类型化读写的自动转换目标。
from ethercat.slave.coe import EcDataType
| 枚举值 | 值 | Python 类型 | 说明 |
|---|---|---|---|
| BOOLEAN | 0x0001 | bool | 布尔 |
| INTEGER8 | 0x0002 | int | int8 |
| INTEGER16 | 0x0003 | int | int16 |
| INTEGER32 | 0x0004 | int | int32 |
| UNSIGNED8 | 0x0005 | int | uint8 |
| UNSIGNED16 | 0x0006 | int | uint16 |
| UNSIGNED32 | 0x0007 | int | uint32 |
| REAL32 | 0x0008 | float | 单精度浮点 |
| VISIBLE_STRING | 0x0009 | str | ASCII 字符串 |
| OCTET_STRING | 0x000A | bytes | 字节串 |
| UNICODE_STRING | 0x000B | str | Unicode 字符串 |
| REAL64 | 0x0011 | float | 双精度浮点 |
| INTEGER64 | 0x0015 | int | int64 |
| UNSIGNED64 | 0x001B | int | uint64 |
EcDataType 辅助属性
# 获取类型对应的字节大小
size = EcDataType.UNSIGNED16.byte_size # 2
# 获取 struct 格式字符串
fmt = EcDataType.INTEGER32.struct_fmt # '<i'
ObjAccess 访问权限标志
from ethercat.slave.coe import ObjAccess
# 检查权限
if entry.obj_access & ObjAccess.READABLE:
print("可读")
if entry.obj_access & ObjAccess.WRITE_OP:
print("OP 状态可写")
# 获取中文描述
print(entry.obj_access.access_description) # "读写(PreOP,SafeOP,OP可写)"
| 标志 | 值 | 说明 |
|---|---|---|
| NONE | 0x0000 | 无权限 |
| READ_PRE | 0x0001 | PreOp 可读 |
| READ_SAFE | 0x0002 | SafeOp 可读 |
| READ_OP | 0x0004 | OP 可读 |
| WRITE_PRE | 0x0008 | PreOp 可写 |
| WRITE_SAFE | 0x0010 | SafeOp 可写 |
| WRITE_OP | 0x0020 | OP 可写 |
| READABLE | 0x0007 | 所有状态可读 |
| WRITABLE | 0x0038 | 所有状态可写 |
完整示例
SDO 读写
from ethercat import EtherCATMaster, EcState
with EtherCATMaster() as master:
master.set_network(r"\\Device\\NPF_{GUID}")
master.set_state(EcState.OP)
master.start()
slave = master[1]
# 读取设备信息
vendor_id = slave.coe.sdo_read_value(0x1018, 1, dtype='u32')
product_code = slave.coe.sdo_read_value(0x1018, 2, dtype='u32')
print(f"VendorID=0x{vendor_id:08X}, ProductCode=0x{product_code:08X}")
# 读取状态字
status = slave.coe.sdo_read_value(0x6041, 0, dtype='u16')
print(f"StatusWord = 0x{status:04X}")
# 写入控制字
slave.coe.sdo_write_value(0x6040, 0, 0x0006, dtype='u16')
# 设置操作模式
slave.coe.sdo_write_value(0x6060, 0, 8, dtype='i8')
# 错误处理
data = slave.coe.sdo_read(0x9999, 0)
if data is None:
error = slave.coe.last_sdo_error
print(f"SDO 错误: {error.name} - {error.describe()}")
master.stop()
对象字典扫描
from ethercat import ODList
od = ODList(master._dll, master.master_index, 1, slave.coe)
if od.loaded:
for obj in od:
print(f"0x{obj.index:04X}: {obj.name}")
for entry in obj:
value = entry.read_typed()
print(f" [{entry.subindex}] {entry.name} = {value}")
# 创建快照副本
snapshot = od.copy()
EMCY 历史记录
# 方式一: 通过从站属性
count = slave.emcy_count
history = slave.emcy_history
for msg in history:
print(f"EMCY 0x{msg['error_code']:04X}")
slave.clear_emcy()
# 方式二: 通过 CoEEmcyRecorder
from ethercat.slave import CoEEmcyRecorder
recorder = CoEEmcyRecorder(slave_index=1)
for msg in recorder.get_emergency_history():
print(f"EMCY 0x{msg.error_code:04X}: {msg.get_error_description()}")
recorder.clear_emergency_history()
CoE 诊断历史 (0x10F3)
ETG.1020 §16 定义了 CoE 诊断历史对象 0x10F3, 从站通过环形缓冲区保存
运行期诊断消息 (Octet-Diagnosis). SDK 提供轻量级的运行时监听接口,
不需要一次性拉取全部历史记录, 适用于 PLC 循环中快速轮询新事件。
read_diagnostic_messages() 的区别read_diagnostic_messages()— 一次性把整个 0x10F3 缓冲区拉回主站, 开销较大poll_has_new_diagnostic()+read_diagnostic_message(idx)— 只拉发生变化的单条消息, 适合运行时周期轮询
poll_has_new_diagnostic()
def poll_has_new_diagnostic(self) -> int
快速轮询 0x10F3:04 NewMessagesAvailable 标志, 判断是否有尚未读取的诊断消息。
返回值:
1— 有新消息 (建议紧跟read_diagnostic_meta()+read_diagnostic_message())0— 无新消息-1— 通信失败 (可回退到read_diagnostic_messages())
示例:
rc = slave.coe.poll_has_new_diagnostic()
if rc == 1:
print("有新诊断消息")
read_diagnostic_meta()
def read_diagnostic_meta(self) -> Optional[DiagMeta]
读取 0x10F3:01-05 元数据块, 返回 DiagMeta 数据类, 失败返回 None。
相关结构:
@dataclass
class DiagMeta:
max_messages: int = 0 # 0x10F3:01, 从站诊断缓冲区容量 (通常 <=250)
newest_message: int = 0 # 0x10F3:02, 最新消息在 0x10F3 中的 subindex
newest_acknowledged: int = 0 # 0x10F3:03, 最后确认的 subindex
flags: int = 0 # 0x10F3:05, bit4=Ring/Linear, bit5=Overrun
flags 位定义:
bit4 (0x10)— 1 = Ring Buffer, 0 = Linearbit5 (0x20)— 1 = 发生过 Overrun (有消息被覆盖)
read_diagnostic_message()
def read_diagnostic_message(self, msg_subidx: int,
buf_size: int = 1024) -> Optional[bytes]
读取 0x10F3:msg_subidx 指定一条 Octet-Diagnosis 原始字节。
参数:
msg_subidx(int) — subindex 范围[6, 255](ETG.1020)buf_size(int) — 接收缓冲字节数, 默认 1024
返回值:
bytes | None— 原始字节 (实际长度); 失败返回None
acknowledge_diagnostic()
def acknowledge_diagnostic(self, msg_subidx: int) -> bool
写 0x10F3:03 = msg_subidx 确认已处理该消息 (Ring Buffer 推进)。
应用层处理完一条消息后应及时确认, 否则 newest_acknowledged 不推进,
可能导致 Overrun。
示例: 周期性轮询监听
while running:
if slave.coe.poll_has_new_diagnostic() == 1:
meta = slave.coe.read_diagnostic_meta()
if meta is None:
continue
# 检查 Ring Buffer / Overrun
is_ring = bool(meta.flags & 0x10)
has_overrun = bool(meta.flags & 0x20)
if has_overrun:
print(f"警告: 诊断缓冲区溢出, 部分消息丢失")
# 从 newest_acknowledged+1 读到 newest_message
start = (meta.newest_acknowledged + 1) if meta.newest_acknowledged >= 6 else 6
end = meta.newest_message
if start <= end:
for idx in range(start, end + 1):
raw = slave.coe.read_diagnostic_message(idx)
if raw:
print(f"[0x10F3:{idx:02X}] {raw.hex()}")
slave.coe.acknowledge_diagnostic(idx)
time.sleep(0.1) # 100ms 轮询
0x10F3 诊断历史与 CiA 301 EMCY 是两套独立机制:
- EMCY 通过邮箱 主动推送 致命错误 (从站异步发送)
- 0x10F3 由主站 主动轮询 读取历史记录 (从站被动响应)
两者可以共存, 对应不同诊断层级。