跳到主要内容

SoE (Servo over EtherCAT)

SoE 协议将 SERCOS 伺服通信协议封装在 EtherCAT 邮箱中,适用于 SERCOS 兼容的伺服驱动器。

通过 slave.soe 访问。从站不支持 SoE 时为 None

SoE vs CoE

大多数 EtherCAT 伺服驱动器使用 CoE + CiA 402 协议栈。 SoE 仅用于 SERCOS 兼容 的驱动器(如 Bosch Rexroth IndraDrive 系列)。 两者不能混用——从站支持哪种取决于硬件。

SERCOS IDN 体系

SoE 通过 IDN(Identification Number) 寻址驱动器参数。每个 IDN 是一个 16 位编号,代表一个参数(位置、速度、控制字等)。

IDN 命名规则:

前缀范围说明
S-x-xxxx0x0000 - 0x7FFFSERCOS 标准参数(所有厂商通用)
P-x-xxxx0x8000 - 0xBFFF产品特定参数
0xC000 - 0xFFFF厂商自定义参数

常用标准 IDN(伺服控制相关):

IDNSERCOS 名说明数据类型
S-0-0001 (0x0001)Cycle Time通信周期时间uint, us
S-0-0011 (0x000B)Position Feedback 1实际位置int, inc
S-0-0012 (0x000C)Position Command目标位置int, inc
S-0-0013 (0x000D)Velocity Feedback实际速度int, rpm
S-0-0014 (0x000E)Velocity Command目标速度int, rpm
S-0-0016 (0x0010)AT Config输入映射配置列表list
S-0-0019 (0x0013)Drive Status驱动器状态字int
S-0-0024 (0x0018)MDT Config输出映射配置列表list
S-0-0036 (0x0024)Max Velocity最大速度限制uint, rpm
S-0-0040 (0x0028)Homing Velocity回零速度uint, rpm
S-0-0064 (0x0040)Drive Control Word驱动器控制字int
S-0-0071 (0x0047)Drive Status Word驱动器状态字int

元素标志(Element Flags)— 读取 IDN 的不同"层面":

标志含义
0x01数据状态
0x02参数名称(字符串)
0x04属性(数据类型/长度/权限位域)
0x08单位(字符串)
0x10最小值
0x20最大值
0x40数据值(默认,最常用)
0x80默认值

SoEElementFlags 位标志

SoEElementFlags 是 ETG.1000.6 §5.11 SoEHeader 的 7 位 element 位掩码 (IntFlag), 多位可 OR 组合一次读 / 写多个层面.

from ethercat.slave.soe import SoEElementFlags
标志含义
NONE0x00
DATA_STATUS0x01数据状态字 (valid/invalid/busy)
NAME0x02IDN 名称 (字符串)
ATTRIBUTE0x04属性 (类型 / 长度 / 小数位等)
UNIT0x08单位字符串
MIN0x10最小值
MAX0x20最大值
VALUE0x40操作数据值 (默认, 最常用)
DEFAULT0x80默认值

示例:

from ethercat.slave.soe import SoEElementFlags

# 一次性读取 Name + Unit + Value
flags = SoEElementFlags.NAME | SoEElementFlags.UNIT | SoEElementFlags.VALUE
data = slave.soe.read(0x000B, int(flags))

SERCOS IDN 编 / 解码

encode_idn()

def encode_idn(is_standard: bool, parameter_set: int, data_block: int) -> int

(is_standard, set, block) 三元组编码为 16 位 IDN.

  • is_standard=True → bit15=0 (S 标准参数)
  • is_standard=False → bit15=1 (P 厂商参数)
  • parameter_set 取 0..7 → bit14..12
  • data_block 取 0..0xFFF → bit11..0

示例:

from ethercat.slave.soe import encode_idn

# S-0-0040 (Velocity command value)
idn = encode_idn(True, 0, 40) # -> 0x0028

# P-1-0123
idn = encode_idn(False, 1, 0x123) # -> 0x9123

decode_idn()

def decode_idn(idn: int) -> Tuple[bool, int, int]

把 16 位 IDN 拆回 (is_standard, parameter_set, data_block) 三元组.

示例:

from ethercat.slave.soe import decode_idn

is_std, set_no, block = decode_idn(0x8000)
# is_std=False, set_no=0, block=0 (即 P-0-0000)

try_parse_sercos_idn()

def try_parse_sercos_idn(text: str) -> Optional[int]

把 SERCOS 标准字符串 ("S-0-0040" / "P-1-0123") 解析为 16 位 IDN, 失败返回 None.

示例:

from ethercat.slave.soe import try_parse_sercos_idn

idn = try_parse_sercos_idn("S-0-0040") # -> 0x0028
idn = try_parse_sercos_idn("invalid") # -> None

format_sercos_idn()

def format_sercos_idn(idn: int) -> str

把 16 位 IDN 格式化为 "S-x-xxxx" / "P-x-xxxx" 字符串.

示例:

from ethercat.slave.soe import format_sercos_idn

text = format_sercos_idn(0x0028) # -> "S-0-0040"
text = format_sercos_idn(0x9123) # -> "P-1-0291"

基本读写

read()

def read(self, idn: int, element_flags: int = 0x40) -> Optional[bytes]

读取 IDN 参数原始字节。使用构造时设置的超时时间(默认 5000ms)。

示例:

import struct

# 读取通信周期时间 (S-0-0001)
data = slave.soe.read(0x0001, 0x40)
if data is not None:
cycle_time = struct.unpack('<I', data)[0]
print(f"周期时间: {cycle_time} us")

write()

def write(self, idn: int, data: bytes, element_flags: int = 0x40) -> bool

写入 IDN 参数。使用构造时设置的超时时间(默认 5000ms)。

示例:

import struct

# 设置目标位置 (S-0-0012)
position = struct.pack('<i', 100000)
success = slave.soe.write(0x000C, position)

类型化读取

所有方法签名一致:read_xxx(idn, element_flags=0x40)

方法返回类型说明
read_int16Optional[int]读取 16 位有符号整数
read_int32Optional[int]读取 32 位有符号整数
read_uint16Optional[int]读取 16 位无符号整数
read_uint32Optional[int]读取 32 位无符号整数
read_floatOptional[float]读取单精度浮点数
read_doubleOptional[float]读取双精度浮点数
read_stringOptional[str]读取字符串

示例:

# 读取实际位置 (S-0-0011)
actual_pos = slave.soe.read_int32(0x000B)

# 读取驱动器状态字 (S-0-0071)
status = slave.soe.read_uint16(0x0047)

# 读取参数名称
name = slave.soe.read_string(0x000B, element_flags=0x02)

类型化写入

所有方法签名一致:write_xxx(idn, value, element_flags=0),返回 bool

方法值类型说明
write_int16int写入 16 位有符号整数
write_int32int写入 32 位有符号整数
write_uint16int写入 16 位无符号整数
write_uint32int写入 32 位无符号整数

示例:

# 设置目标位置 (S-0-0012)
slave.soe.write_int32(0x000C, 50000)

# 写控制字 (S-0-0064)
slave.soe.write_uint16(0x0040, 0x0006)

元数据读取

def read_name(self, idn: int) -> Optional[str]
def read_unit(self, idn: int) -> Optional[str]
def read_min_value(self, idn: int) -> Optional[bytes]
def read_max_value(self, idn: int) -> Optional[bytes]
def read_default_value(self, idn: int) -> Optional[bytes]

示例:

name = slave.soe.read_name(0x000B)
unit = slave.soe.read_unit(0x000B)
min_val = slave.soe.read_min_value(0x000B)
max_val = slave.soe.read_max_value(0x000B)

print(f"参数: {name} ({unit})")
if min_val and max_val:
print(f"范围: {struct.unpack('<i', min_val)[0]} - {struct.unpack('<i', max_val)[0]}")

参数信息

get_available_idns()

def get_available_idns(self) -> List[int]

获取从站支持的所有 IDN 列表。

get_parameter_info()

def get_parameter_info(self, idn: int) -> SoEParameter

获取 IDN 参数的完整信息(名称、单位、属性、当前值、最小/最大值等)。

返回值:

  • SoEParameter — 包含完整的参数信息

SoEParameter 类:

class SoEParameter:
idn: int # IDN 编号
name: str # 参数名称
unit: str # 单位
attributes: SoEAttributes # 属性(数据类型、长度等)
value: bytes # 当前值
default_value: bytes # 默认值
min_value: bytes # 最小值
max_value: bytes # 最大值
sercos_idn: str # SERCOS IDN 格式(如 "S-0-0001")
category: str # 类别("Standard" / "Product" / "Vendor")
is_read_only: bool # 是否只读
access_mode: str # 访问权限("RO" / "RW" / "RW*")
formatted_value: str # 格式化的当前值
formatted_min_value: str # 格式化的最小值
formatted_max_value: str # 格式化的最大值

SoEAttributes 结构:

class SoEAttributes:
evaluation_factor: int # 评价因子
length: int # 数据长度
is_list: bool # 是否为列表
is_command: bool # 是否为命令
data_type: SoEDataType # 数据类型
decimals: int # 小数位数
write_protected_preop: bool # PreOp 状态写保护
write_protected_safeop: bool # SafeOp 状态写保护
write_protected_op: bool # Op 状态写保护

SoEDataType 枚举:

class SoEDataType(IntEnum):
BINARY = 0 # 二进制
UINT = 1 # 无符号整数
INT = 2 # 有符号整数
HEXADECIMAL = 3 # 十六进制
STRING = 4 # 字符串
IDN = 5 # IDN 引用
FLOAT = 6 # 浮点数
PARAMETER = 7 # 参数

示例:

param = slave.soe.get_parameter_info(0x000B)
print(f"IDN: {param.sercos_idn} - {param.name}")
print(f"单位: {param.unit}, 访问: {param.access_mode}")
print(f"当前值: {param.formatted_value}")
print(f"范围: {param.formatted_min_value} - {param.formatted_max_value}")

AT/MDT 映射

SERCOS 使用 AT(Acknowledge Telegram)和 MDT(Master Data Telegram)传输周期数据:

  • AT = 从站到主站的周期输入(实际位置、实际速度、状态字...)
  • MDT = 主站到从站的周期输出(目标位置、目标速度、控制字...)

映射配置决定了哪些 IDN 参数被包含在周期数据帧中。

get_idn_mapping()

def get_idn_mapping(self) -> SoEMappingInfo

读取当前驱动器的 AT/MDT 映射配置。

返回值:

  • SoEMappingInfo — 映射信息对象

SoEMappingInfo 类:

class SoEMappingInfo:
at_mapping: List[SoEMappingEntry] # AT(输入)映射列表
mdt_mapping: List[SoEMappingEntry] # MDT(输出)映射列表
at_bit_size: int # AT 总位数
mdt_bit_size: int # MDT 总位数
at_byte_size: int # AT 总字节数
mdt_byte_size: int # MDT 总字节数

SoEMappingEntry 类:

class SoEMappingEntry:
idn: int # IDN 编号
name: str # 参数名称
bit_length: int # 数据长度(位)
byte_length: int # 数据长度(字节)

get_all_drive_mappings()

def get_all_drive_mappings(self) -> Dict[int, SoEMappingInfo]

获取所有驱动器的映射信息(最多扫描 8 个驱动器)。

返回值:

  • Dict[int, SoEMappingInfo] — 驱动器编号到映射信息的字典
SoEAdvanced 为高级 API

AT/MDT 映射查询通过 SoEAdvanced 类提供(继承自 SoE,位于 soe.py 模块),属于内部高级 API, 仅在需要 IDN 周期数据布局扫描时使用。常规 IDN 读写/程序命令走 slave.soe (基础接口) 即可。 高级 API 的句柄参数后续可能调整, 请勿在生产代码中长期固化该构造形式。

from ethercat import SoEAdvanced

soe_adv = SoEAdvanced(master._dll, master.master_index, slave.slave_num)
mapping = soe_adv.get_idn_mapping()
print(f"AT 映射(输入 {mapping.at_byte_size} 字节):")
for entry in mapping.at_mapping:
print(f" IDN 0x{entry.idn:04X}: {entry.name} ({entry.byte_length}B)")

print(f"MDT 映射(输出 {mapping.mdt_byte_size} 字节):")
for entry in mapping.mdt_mapping:
print(f" IDN 0x{entry.idn:04X}: {entry.name} ({entry.byte_length}B)")

程序命令

execute_command()

def execute_command(self, idn: int, timeout_ms: int = 5000, poll_interval_ms: int = 50) -> bool

执行 SoE 程序命令(SERCOS Procedure Command)。

示例:

# 执行回零命令 (S-0-0148)
success = slave.soe.execute_command(0x0094)
if success:
print("回零完成")

# 自定义超时(长时间命令)
ok = slave.soe.execute_command(0x0094, timeout_ms=30000, poll_interval_ms=100)

read_data_state()

def read_data_state(self, idn: int) -> Optional[int]

读取 IDN 的 Data State 元素(ElementFlag 0x01)。Data State 包含命令状态、数据有效性等信息。

参数:

  • idn (int) — IDN 编号

返回值:

  • int | None — Data State 值(uint16),失败返回 None

示例:

# 读取 IDN 的 Data State
state = slave.soe.read_data_state(0x0094)
if state is not None:
command_active = (state & 0x01) != 0
command_error = (state & 0x02) != 0
print(f"命令激活: {command_active}, 命令错误: {command_error}")

参数变化通知

SoE Notification 功能允许监控 SERCOS 从站 IDN 参数的变化。当参数值发生改变时,通过回调通知应用程序。

实现机制:

  1. 硬件通知:通过写入 S-0-0127(通知请求 IDN)尝试启用从站的原生通知
  2. 轮询检测:如果从站不支持硬件通知,自动回退到轮询模式

on_notification()

def on_notification(self, callback: Callable[[SoENotificationEventArgs], None])

注册参数变化通知回调函数。

enable_notification()

def enable_notification(self, idn: int, timeout_ms: int = 500) -> bool

启用指定 IDN 的参数变化通知。首先尝试通过 S-0-0127 启用硬件通知,无论是否成功都会将 IDN 加入轮询监控列表。

返回值:

  • boolTrue 表示从站确认支持硬件通知;False 表示回退到轮询模式

disable_notification()

def disable_notification(self, idn: int)

禁用指定 IDN 的参数变化通知。

disable_all_notifications()

def disable_all_notifications(self)

禁用所有已启用的通知,清空监控列表。

poll_notifications()

def poll_notifications(self, timeout_ms: int = 200) -> int

轮询检测所有已监控 IDN 的参数变化。发现变化时触发已注册的回调。

返回值:

  • int — 本次轮询检测到变化的 IDN 数量

monitored_idns 属性

@property
def monitored_idns(self) -> List[int]

获取当前正在监控的 IDN 列表。

SoENotificationEventArgs

class SoENotificationEventArgs:
slave_index: int # 从站索引
drive_number: int # 驱动器编号
idn: int # 变化的 IDN 编号
new_value: bytes # 新值
old_value: bytes # 旧值
sercos_idn: str # 格式化的 SERCOS IDN 名称 (如 "S-0-0127")

示例:

if slave.soe is not None:
# 注册通知回调
def on_change(args):
print(f"参数变化: {args.sercos_idn} (IDN 0x{args.idn:04X})")
print(f" 旧值: {args.old_value.hex()}")
print(f" 新值: {args.new_value.hex()}")

slave.soe.on_notification(on_change)

# 启用监控实际位置 (S-0-0011)
hw_supported = slave.soe.enable_notification(0x000B)
print(f"硬件通知: {'支持' if hw_supported else '回退到轮询'}")

# 启用监控驱动器状态字 (S-0-0071)
slave.soe.enable_notification(0x0047)

# 周期性轮询检测
changed = slave.soe.poll_notifications()
if changed > 0:
print(f"检测到 {changed} 个参数变化")

# 查看已监控的 IDN
print(f"监控列表: {slave.soe.monitored_idns}")

# 停止监控
slave.soe.disable_all_notifications()

全局 Notification / Emergency 分发器

除了上述 SoE 实例级的 on_notification() / enable_notification() 回调, SDK 还提供 全局单实例分发器, 统一监听所有从站的 SoE Notification (OpCode=5) 与 SoE Emergency (OpCode=6) 事件。

分发器在底层邮箱接收线程中同步触发, 多个 Python 回调按注册顺序执行。

适用场景
  • 全局日志记录: 一个回调统一记录所有 SoE 从站的参数变化
  • Emergency 告警系统: SERCOS 驱动器发送致命错误帧 (OpCode=6) 时统一上报
  • 多从站场景: 避免为每个 slave.soe 实例重复注册回调

SoENotificationEventArgs

全局分发器的 Notification 参数:

@dataclass
class SoENotificationEventArgs:
slave_index: int # 从站索引
drive_number: int # 驱动器编号 (多轴设备)
element_flags: int # Element 标志 (0x40=Value / 0x02=Name / 0x04=Attribute ...)
idn: int # IDN 编号
new_value: Optional[bytes] # 新值
old_value: Optional[bytes] # 旧值 (可能为 None)
timestamp: datetime # 接收时间 (UTC)

@property
def sercos_idn(self) -> str # "S-0-0127" 格式

SoEEmergencyEventArgs

SERCOS Emergency 帧 (OpCode=6) 参数, 由从站异步推送的致命错误:

@dataclass
class SoEEmergencyEventArgs:
slave_index: int # 从站索引
drive_number: int # 驱动器编号
error_code: int # Sercos Emergency Error Code (uint16)
timestamp: datetime # 接收时间 (UTC)

注册 / 移除回调

from ethercat.slave.soe import (
add_soe_notification_callback,
remove_soe_notification_callback,
add_soe_emergency_callback,
remove_soe_emergency_callback,
)

def add_soe_notification_callback(
callback: Callable[[SoENotificationEventArgs], None]
) -> None

def remove_soe_notification_callback(
callback: Callable[[SoENotificationEventArgs], None]
) -> None

def add_soe_emergency_callback(
callback: Callable[[SoEEmergencyEventArgs], None]
) -> None

def remove_soe_emergency_callback(
callback: Callable[[SoEEmergencyEventArgs], None]
) -> None

示例 — 全局 Notification 监听:

from ethercat.slave.soe import add_soe_notification_callback

def on_notif(e):
print(f"[{e.timestamp:%H:%M:%S}] Slave {e.slave_index} "
f"IDN {e.sercos_idn} (0x{e.idn:04X}) "
f"element=0x{e.element_flags:02X} 变化")
if e.new_value:
print(f" new = {e.new_value.hex()}")

add_soe_notification_callback(on_notif)

示例 — Emergency 告警:

from ethercat.slave.soe import add_soe_emergency_callback

def on_emcy(e):
print(f"[EMCY] Slave {e.slave_index} drive={e.drive_number} "
f"error=0x{e.error_code:04X} at {e.timestamp:%H:%M:%S.%f}")

add_soe_emergency_callback(on_emcy)
回调线程模型

Python 回调在 底层邮箱接收线程 中同步执行:

  1. 不要长时间阻塞 — 应快速返回, 否则后续 SoE 邮箱会被延迟处理
  2. 访问 Tk / WinForms / Qt 控件需要跨线程 — 使用 queue.Queue 或各框架的 after/invoke 转主线程
  3. 异常会被 SDK 吞掉 — 确保回调内部捕获异常并自行记录日志
import queue

ui_queue: queue.Queue = queue.Queue()

def on_emcy(e):
try:
ui_queue.put_nowait(("emcy", e))
except queue.Full:
pass # UI 慢, 丢弃

标准参考: IEC 61800-7-204 (SERCOS over EtherCAT) §Service Channel Communication.

标准 IDN 常量

class StandardIDN:
IDN_LIST = 0x0000 # IDN 列表 (S-0-0000)
CYCLE_TIME = 0x0001 # 通信周期时间 (S-0-0001)
AT_CONFIG = 0x0010 # AT 配置 - 输入映射 (S-0-0016)
MDT_CONFIG = 0x0018 # MDT 配置 - 输出映射 (S-0-0024)
NOTIFICATION_REQUEST = 0x007F # 通知请求 (S-0-0127)

完整示例

参数扫描

if slave.soe is not None:
idns = slave.soe.get_available_idns()
print(f"从站支持 {len(idns)} 个 IDN 参数\n")

for idn in idns[:10]:
info = slave.soe.get_parameter_info(idn)
print(f"IDN 0x{info['idn']:04X}: {info['name']} ({info['unit']})")

伺服驱动器控制

if slave.soe is not None:
# 1. 读取驱动器状态
status_word = slave.soe.read_uint16(0x0047)
print(f"状态字: 0x{status_word:04X}")

# 2. 查看 AT/MDT 映射(了解周期数据布局)
from ethercat import SoEAdvanced
soe_adv = SoEAdvanced(master._dll, master.master_index, slave.slave_num)
mapping = soe_adv.get_idn_mapping()

print(f"\nAT 映射(输入 {mapping.at_byte_size} 字节):")
for entry in mapping.at_mapping:
print(f" IDN 0x{entry.idn:04X}: {entry.name} ({entry.byte_length}B)")

print(f"\nMDT 映射(输出 {mapping.mdt_byte_size} 字节):")
for entry in mapping.mdt_mapping:
print(f" IDN 0x{entry.idn:04X}: {entry.name} ({entry.byte_length}B)")

# 3. 写控制字使能驱动器 (S-0-0064)
slave.soe.write_uint16(0x0040, 0x0006)

# 4. 设置目标位置 (S-0-0012)
slave.soe.write_int32(0x000C, 50000)

# 5. 读取实际位置 (S-0-0011)
actual_pos = slave.soe.read_int32(0x000B)
print(f"\n实际位置: {actual_pos} inc")