SoE (Servo over EtherCAT)
SoE 协议将 SERCOS 伺服通信协议封装在 EtherCAT 邮箱中,适用于 SERCOS 兼容的伺服驱动器。
通过 slave.soe 访问。从站不支持 SoE 时为 None。
大多数 EtherCAT 伺服驱动器使用 CoE + CiA 402 协议栈。 SoE 仅用于 SERCOS 兼容 的驱动器(如 Bosch Rexroth IndraDrive 系列)。 两者不能混用——从站支持哪种取决于硬件。
SERCOS IDN 体系
SoE 通过 IDN(Identification Number) 寻址驱动器参数。每个 IDN 是一个 16 位编号,代表一个参数(位置、速度、控制字等)。
IDN 命名规则:
| 前缀 | 范围 | 说明 |
|---|---|---|
| S-x-xxxx | 0x0000 - 0x7FFF | SERCOS 标准参数(所有厂商通用) |
| P-x-xxxx | 0x8000 - 0xBFFF | 产品特定参数 |
| — | 0xC000 - 0xFFFF | 厂商自定义参数 |
常用标准 IDN(伺服控制相关):
| IDN | SERCOS 名 | 说明 | 数据类型 |
|---|---|---|---|
| S-0-0001 (0x0001) | Cycle Time | 通信周期时间 | uint, us |
| S-0-0011 (0x000B) | Position Feedback 1 | 实际位置 | int, inc |
| S-0-0012 (0x000C) | Position Command | 目标位置 | int, inc |
| S-0-0013 (0x000D) | Velocity Feedback | 实际速度 | int, rpm |
| S-0-0014 (0x000E) | Velocity Command | 目标速度 | int, rpm |
| S-0-0016 (0x0010) | AT Config | 输入映射配置列表 | list |
| S-0-0019 (0x0013) | Drive Status | 驱动器状态字 | int |
| S-0-0024 (0x0018) | MDT Config | 输出映射配置列表 | list |
| S-0-0036 (0x0024) | Max Velocity | 最大速度限制 | uint, rpm |
| S-0-0040 (0x0028) | Homing Velocity | 回零速度 | uint, rpm |
| S-0-0064 (0x0040) | Drive Control Word | 驱动器控制字 | int |
| S-0-0071 (0x0047) | Drive Status Word | 驱动器状态字 | int |
元素标志(Element Flags)— 读取 IDN 的不同"层面":
| 标志 | 含义 |
|---|---|
| 0x01 | 数据状态 |
| 0x02 | 参数名称(字符串) |
| 0x04 | 属性(数据类型/长度/权限位域) |
| 0x08 | 单位(字符串) |
| 0x10 | 最小值 |
| 0x20 | 最大值 |
| 0x40 | 数据值(默认,最常用) |
| 0x80 | 默认值 |
SoEElementFlags 位标志
SoEElementFlags 是 ETG.1000.6 §5.11 SoEHeader 的 7 位 element 位掩码 (IntFlag), 多位可 OR 组合一次读 / 写多个层面.
from ethercat.slave.soe import SoEElementFlags
| 标志 | 值 | 含义 |
|---|---|---|
| NONE | 0x00 | 无 |
| DATA_STATUS | 0x01 | 数据状态字 (valid/invalid/busy) |
| NAME | 0x02 | IDN 名称 (字符串) |
| ATTRIBUTE | 0x04 | 属性 (类型 / 长度 / 小数位等) |
| UNIT | 0x08 | 单位字符串 |
| MIN | 0x10 | 最小值 |
| MAX | 0x20 | 最大值 |
| VALUE | 0x40 | 操作数据值 (默认, 最常用) |
| DEFAULT | 0x80 | 默认值 |
示例:
from ethercat.slave.soe import SoEElementFlags
# 一次性读取 Name + Unit + Value
flags = SoEElementFlags.NAME | SoEElementFlags.UNIT | SoEElementFlags.VALUE
data = slave.soe.read(0x000B, int(flags))
SERCOS IDN 编 / 解码
encode_idn()
def encode_idn(is_standard: bool, parameter_set: int, data_block: int) -> int
把 (is_standard, set, block) 三元组编码为 16 位 IDN.
is_standard=True→ bit15=0 (S 标准参数)is_standard=False→ bit15=1 (P 厂商参数)parameter_set取 0..7 → bit14..12data_block取 0..0xFFF → bit11..0
示例:
from ethercat.slave.soe import encode_idn
# S-0-0040 (Velocity command value)
idn = encode_idn(True, 0, 40) # -> 0x0028
# P-1-0123
idn = encode_idn(False, 1, 0x123) # -> 0x9123
decode_idn()
def decode_idn(idn: int) -> Tuple[bool, int, int]
把 16 位 IDN 拆回 (is_standard, parameter_set, data_block) 三元组.
示例:
from ethercat.slave.soe import decode_idn
is_std, set_no, block = decode_idn(0x8000)
# is_std=False, set_no=0, block=0 (即 P-0-0000)
try_parse_sercos_idn()
def try_parse_sercos_idn(text: str) -> Optional[int]
把 SERCOS 标准字符串 ("S-0-0040" / "P-1-0123") 解析为 16 位 IDN, 失败返回 None.
示例:
from ethercat.slave.soe import try_parse_sercos_idn
idn = try_parse_sercos_idn("S-0-0040") # -> 0x0028
idn = try_parse_sercos_idn("invalid") # -> None
format_sercos_idn()
def format_sercos_idn(idn: int) -> str
把 16 位 IDN 格式化为 "S-x-xxxx" / "P-x-xxxx" 字符串.
示例:
from ethercat.slave.soe import format_sercos_idn
text = format_sercos_idn(0x0028) # -> "S-0-0040"
text = format_sercos_idn(0x9123) # -> "P-1-0291"
基本读写
read()
def read(self, idn: int, element_flags: int = 0x40) -> Optional[bytes]
读取 IDN 参数原始字节。使用构造时设置的超时时间(默认 5000ms)。
示例:
import struct
# 读取通信周期时间 (S-0-0001)
data = slave.soe.read(0x0001, 0x40)
if data is not None:
cycle_time = struct.unpack('<I', data)[0]
print(f"周期时间: {cycle_time} us")
write()
def write(self, idn: int, data: bytes, element_flags: int = 0x40) -> bool
写入 IDN 参数。使用构造时设置的超时时间(默认 5000ms)。
示例:
import struct
# 设置目标位置 (S-0-0012)
position = struct.pack('<i', 100000)
success = slave.soe.write(0x000C, position)
类型化读取
所有方法签名一致:read_xxx(idn, element_flags=0x40)
| 方法 | 返回类型 | 说明 |
|---|---|---|
| read_int16 | Optional[int] | 读取 16 位有符号整数 |
| read_int32 | Optional[int] | 读取 32 位有符号整数 |
| read_uint16 | Optional[int] | 读取 16 位无符号整数 |
| read_uint32 | Optional[int] | 读取 32 位无符号整数 |
| read_float | Optional[float] | 读取单精度浮点数 |
| read_double | Optional[float] | 读取双精度浮点数 |
| read_string | Optional[str] | 读取字符串 |
示例:
# 读取实际位置 (S-0-0011)
actual_pos = slave.soe.read_int32(0x000B)
# 读取驱动器状态字 (S-0-0071)
status = slave.soe.read_uint16(0x0047)
# 读取参数名称
name = slave.soe.read_string(0x000B, element_flags=0x02)
类型化写入
所有方法签名一致:write_xxx(idn, value, element_flags=0),返回 bool。
| 方法 | 值类型 | 说明 |
|---|---|---|
| write_int16 | int | 写入 16 位有符号整数 |
| write_int32 | int | 写入 32 位有符号整数 |
| write_uint16 | int | 写入 16 位无符号整数 |
| write_uint32 | int | 写入 32 位无符号整数 |
示例:
# 设置目标位置 (S-0-0012)
slave.soe.write_int32(0x000C, 50000)
# 写控制字 (S-0-0064)
slave.soe.write_uint16(0x0040, 0x0006)
元数据读取
def read_name(self, idn: int) -> Optional[str]
def read_unit(self, idn: int) -> Optional[str]
def read_min_value(self, idn: int) -> Optional[bytes]
def read_max_value(self, idn: int) -> Optional[bytes]
def read_default_value(self, idn: int) -> Optional[bytes]
示例:
name = slave.soe.read_name(0x000B)
unit = slave.soe.read_unit(0x000B)
min_val = slave.soe.read_min_value(0x000B)
max_val = slave.soe.read_max_value(0x000B)
print(f"参数: {name} ({unit})")
if min_val and max_val:
print(f"范围: {struct.unpack('<i', min_val)[0]} - {struct.unpack('<i', max_val)[0]}")
参数信息
get_available_idns()
def get_available_idns(self) -> List[int]
获取从站支持的所有 IDN 列表。
get_parameter_info()
def get_parameter_info(self, idn: int) -> SoEParameter
获取 IDN 参数的完整信息(名称、单位、属性、当前值、最小/最大值等)。
返回值:
SoEParameter— 包含完整的参数信息
SoEParameter 类:
class SoEParameter:
idn: int # IDN 编号
name: str # 参数名称
unit: str # 单位
attributes: SoEAttributes # 属性(数据类型、长度等)
value: bytes # 当前值
default_value: bytes # 默认值
min_value: bytes # 最小值
max_value: bytes # 最大值
sercos_idn: str # SERCOS IDN 格式(如 "S-0-0001")
category: str # 类别("Standard" / "Product" / "Vendor")
is_read_only: bool # 是否只读
access_mode: str # 访问权限("RO" / "RW" / "RW*")
formatted_value: str # 格式化的当前值
formatted_min_value: str # 格式化的最小值
formatted_max_value: str # 格式化的最大值
SoEAttributes 结构:
class SoEAttributes:
evaluation_factor: int # 评价因子
length: int # 数据长度
is_list: bool # 是否为列表
is_command: bool # 是否为命令
data_type: SoEDataType # 数据类型
decimals: int # 小数位数
write_protected_preop: bool # PreOp 状态写保护
write_protected_safeop: bool # SafeOp 状态写保护
write_protected_op: bool # Op 状态写保护
SoEDataType 枚举:
class SoEDataType(IntEnum):
BINARY = 0 # 二进制
UINT = 1 # 无符号整数
INT = 2 # 有符号整数
HEXADECIMAL = 3 # 十六进制
STRING = 4 # 字符串
IDN = 5 # IDN 引用
FLOAT = 6 # 浮点数
PARAMETER = 7 # 参数
示例:
param = slave.soe.get_parameter_info(0x000B)
print(f"IDN: {param.sercos_idn} - {param.name}")
print(f"单位: {param.unit}, 访问: {param.access_mode}")
print(f"当前值: {param.formatted_value}")
print(f"范围: {param.formatted_min_value} - {param.formatted_max_value}")
AT/MDT 映射
SERCOS 使用 AT(Acknowledge Telegram)和 MDT(Master Data Telegram)传输周期数据:
- AT = 从站到主站的周期输入(实际位置、实际速度、状态字...)
- MDT = 主站到从站的周期输出(目标位置、目标速度、控制字...)
映射配置决定了哪些 IDN 参数被包含在周期数据帧中。
get_idn_mapping()
def get_idn_mapping(self) -> SoEMappingInfo
读取当前驱动器的 AT/MDT 映射配置。
返回值:
SoEMappingInfo— 映射信息对象
SoEMappingInfo 类:
class SoEMappingInfo:
at_mapping: List[SoEMappingEntry] # AT(输入)映射列表
mdt_mapping: List[SoEMappingEntry] # MDT(输出)映射列表
at_bit_size: int # AT 总位数
mdt_bit_size: int # MDT 总位数
at_byte_size: int # AT 总字节数
mdt_byte_size: int # MDT 总字节数
SoEMappingEntry 类:
class SoEMappingEntry:
idn: int # IDN 编号
name: str # 参数名称
bit_length: int # 数据长度(位)
byte_length: int # 数据长度(字节)
get_all_drive_mappings()
def get_all_drive_mappings(self) -> Dict[int, SoEMappingInfo]
获取所有驱动器的映射信息(最多扫描 8 个驱动器)。
返回值:
Dict[int, SoEMappingInfo]— 驱动器编号到映射信息的字典
AT/MDT 映射查询通过 SoEAdvanced 类提供(继承自 SoE,位于 soe.py 模块),属于内部高级 API, 仅在需要 IDN 周期数据布局扫描时使用。常规 IDN 读写/程序命令走 slave.soe (基础接口) 即可。
高级 API 的句柄参数后续可能调整, 请勿在生产代码中长期固化该构造形式。
from ethercat import SoEAdvanced
soe_adv = SoEAdvanced(master._dll, master.master_index, slave.slave_num)
mapping = soe_adv.get_idn_mapping()
print(f"AT 映射(输入 {mapping.at_byte_size} 字节):")
for entry in mapping.at_mapping:
print(f" IDN 0x{entry.idn:04X}: {entry.name} ({entry.byte_length}B)")
print(f"MDT 映射(输出 {mapping.mdt_byte_size} 字节):")
for entry in mapping.mdt_mapping:
print(f" IDN 0x{entry.idn:04X}: {entry.name} ({entry.byte_length}B)")
程序命令
execute_command()
def execute_command(self, idn: int, timeout_ms: int = 5000, poll_interval_ms: int = 50) -> bool
执行 SoE 程序命令(SERCOS Procedure Command)。
示例:
# 执行回零命令 (S-0-0148)
success = slave.soe.execute_command(0x0094)
if success:
print("回零完成")
# 自定义超时(长时间命令)
ok = slave.soe.execute_command(0x0094, timeout_ms=30000, poll_interval_ms=100)
read_data_state()
def read_data_state(self, idn: int) -> Optional[int]
读取 IDN 的 Data State 元素(ElementFlag 0x01)。Data State 包含命令状态、数据有效性等信息。
参数:
idn(int) — IDN 编号
返回值:
int | None— Data State 值(uint16),失败返回None
示例:
# 读取 IDN 的 Data State
state = slave.soe.read_data_state(0x0094)
if state is not None:
command_active = (state & 0x01) != 0
command_error = (state & 0x02) != 0
print(f"命令激活: {command_active}, 命令错误: {command_error}")
参数变化通知
SoE Notification 功能允许监控 SERCOS 从站 IDN 参数的变化。当参数值发生改变时,通过回调通知应用程序。
实现机制:
- 硬件通知:通过写入 S-0-0127(通知请求 IDN)尝试启用从站的原生通知
- 轮询检测:如果从站不支持硬件通知,自动回退到轮询模式
on_notification()
def on_notification(self, callback: Callable[[SoENotificationEventArgs], None])
注册参数变化通知回调函数。
enable_notification()
def enable_notification(self, idn: int, timeout_ms: int = 500) -> bool
启用指定 IDN 的参数变化通知。首先尝试通过 S-0-0127 启用硬件通知,无论是否成功都会将 IDN 加入轮询监控列表。
返回值:
bool—True表示从站确认支持硬件通知;False表示回退到轮询模式
disable_notification()
def disable_notification(self, idn: int)
禁用指定 IDN 的参数变化通知。
disable_all_notifications()
def disable_all_notifications(self)
禁用所有已启用的通知,清空监控列表。
poll_notifications()
def poll_notifications(self, timeout_ms: int = 200) -> int
轮询检测所有已监控 IDN 的参数变化。发现变化时触发已注册的回调。
返回值:
int— 本次轮询检测到变化的 IDN 数量
monitored_idns 属性
@property
def monitored_idns(self) -> List[int]
获取当前正在监控的 IDN 列表。
SoENotificationEventArgs
class SoENotificationEventArgs:
slave_index: int # 从站索引
drive_number: int # 驱动器编号
idn: int # 变化的 IDN 编号
new_value: bytes # 新值
old_value: bytes # 旧值
sercos_idn: str # 格式化的 SERCOS IDN 名称 (如 "S-0-0127")
示例:
if slave.soe is not None:
# 注册通知回调
def on_change(args):
print(f"参数变化: {args.sercos_idn} (IDN 0x{args.idn:04X})")
print(f" 旧值: {args.old_value.hex()}")
print(f" 新值: {args.new_value.hex()}")
slave.soe.on_notification(on_change)
# 启用监控实际位置 (S-0-0011)
hw_supported = slave.soe.enable_notification(0x000B)
print(f"硬件通知: {'支持' if hw_supported else '回退到轮询'}")
# 启用监控驱动器状态字 (S-0-0071)
slave.soe.enable_notification(0x0047)
# 周期性轮询检测
changed = slave.soe.poll_notifications()
if changed > 0:
print(f"检测到 {changed} 个参数变化")
# 查看已监控的 IDN
print(f"监控列表: {slave.soe.monitored_idns}")
# 停止监控
slave.soe.disable_all_notifications()
全局 Notification / Emergency 分发器
除了上述 SoE 实例级的 on_notification() / enable_notification() 回调,
SDK 还提供 全局单实例分发器, 统一监听所有从站的 SoE Notification (OpCode=5)
与 SoE Emergency (OpCode=6) 事件。
分发器在底层邮箱接收线程中同步触发, 多个 Python 回调按注册顺序执行。
- 全局日志记录: 一个回调统一记录所有 SoE 从站的参数变化
- Emergency 告警系统: SERCOS 驱动器发送致命错误帧 (OpCode=6) 时统一上报
- 多从站场景: 避免为每个
slave.soe实例重复注册回调
SoENotificationEventArgs
全局分发器的 Notification 参数:
@dataclass
class SoENotificationEventArgs:
slave_index: int # 从站索引
drive_number: int # 驱动器编号 (多轴设备)
element_flags: int # Element 标志 (0x40=Value / 0x02=Name / 0x04=Attribute ...)
idn: int # IDN 编号
new_value: Optional[bytes] # 新值
old_value: Optional[bytes] # 旧值 (可能为 None)
timestamp: datetime # 接收时间 (UTC)
@property
def sercos_idn(self) -> str # "S-0-0127" 格式
SoEEmergencyEventArgs
SERCOS Emergency 帧 (OpCode=6) 参数, 由从站异步推送的致命错误:
@dataclass
class SoEEmergencyEventArgs:
slave_index: int # 从站索引
drive_number: int # 驱动器编号
error_code: int # Sercos Emergency Error Code (uint16)
timestamp: datetime # 接收时间 (UTC)
注册 / 移除回调
from ethercat.slave.soe import (
add_soe_notification_callback,
remove_soe_notification_callback,
add_soe_emergency_callback,
remove_soe_emergency_callback,
)
def add_soe_notification_callback(
callback: Callable[[SoENotificationEventArgs], None]
) -> None
def remove_soe_notification_callback(
callback: Callable[[SoENotificationEventArgs], None]
) -> None
def add_soe_emergency_callback(
callback: Callable[[SoEEmergencyEventArgs], None]
) -> None
def remove_soe_emergency_callback(
callback: Callable[[SoEEmergencyEventArgs], None]
) -> None
示例 — 全局 Notification 监听:
from ethercat.slave.soe import add_soe_notification_callback
def on_notif(e):
print(f"[{e.timestamp:%H:%M:%S}] Slave {e.slave_index} "
f"IDN {e.sercos_idn} (0x{e.idn:04X}) "
f"element=0x{e.element_flags:02X} 变化")
if e.new_value:
print(f" new = {e.new_value.hex()}")
add_soe_notification_callback(on_notif)
示例 — Emergency 告警:
from ethercat.slave.soe import add_soe_emergency_callback
def on_emcy(e):
print(f"[EMCY] Slave {e.slave_index} drive={e.drive_number} "
f"error=0x{e.error_code:04X} at {e.timestamp:%H:%M:%S.%f}")
add_soe_emergency_callback(on_emcy)
Python 回调在 底层邮箱接收线程 中同步执行:
- 不要长时间阻塞 — 应快速返回, 否则后续 SoE 邮箱会被延迟处理
- 访问 Tk / WinForms / Qt 控件需要跨线程 — 使用
queue.Queue或各框架的after/invoke转主线程 - 异常会被 SDK 吞掉 — 确保回调内部捕获异常并自行记录日志
import queue
ui_queue: queue.Queue = queue.Queue()
def on_emcy(e):
try:
ui_queue.put_nowait(("emcy", e))
except queue.Full:
pass # UI 慢, 丢弃
标准参考: IEC 61800-7-204 (SERCOS over EtherCAT) §Service Channel Communication.
标准 IDN 常量
class StandardIDN:
IDN_LIST = 0x0000 # IDN 列表 (S-0-0000)
CYCLE_TIME = 0x0001 # 通信周期时间 (S-0-0001)
AT_CONFIG = 0x0010 # AT 配置 - 输入映射 (S-0-0016)
MDT_CONFIG = 0x0018 # MDT 配置 - 输出映射 (S-0-0024)
NOTIFICATION_REQUEST = 0x007F # 通知请求 (S-0-0127)
完整示例
参数扫描
if slave.soe is not None:
idns = slave.soe.get_available_idns()
print(f"从站支持 {len(idns)} 个 IDN 参数\n")
for idn in idns[:10]:
info = slave.soe.get_parameter_info(idn)
print(f"IDN 0x{info['idn']:04X}: {info['name']} ({info['unit']})")
伺服驱动器控制
if slave.soe is not None:
# 1. 读取驱动器状态
status_word = slave.soe.read_uint16(0x0047)
print(f"状态字: 0x{status_word:04X}")
# 2. 查看 AT/MDT 映射(了解周期数据布局)
from ethercat import SoEAdvanced
soe_adv = SoEAdvanced(master._dll, master.master_index, slave.slave_num)
mapping = soe_adv.get_idn_mapping()
print(f"\nAT 映射(输入 {mapping.at_byte_size} 字节):")
for entry in mapping.at_mapping:
print(f" IDN 0x{entry.idn:04X}: {entry.name} ({entry.byte_length}B)")
print(f"\nMDT 映射(输出 {mapping.mdt_byte_size} 字节):")
for entry in mapping.mdt_mapping:
print(f" IDN 0x{entry.idn:04X}: {entry.name} ({entry.byte_length}B)")
# 3. 写控制字使能驱动器 (S-0-0064)
slave.soe.write_uint16(0x0040, 0x0006)
# 4. 设置目标位置 (S-0-0012)
slave.soe.write_int32(0x000C, 50000)
# 5. 读取实际位置 (S-0-0011)
actual_pos = slave.soe.read_int32(0x000B)
print(f"\n实际位置: {actual_pos} inc")